From 6a12760c99293eb26a3924169887c1e434217b37 Mon Sep 17 00:00:00 2001 From: BernardNganduDev Date: Mon, 6 Oct 2025 16:59:38 +0200 Subject: [PATCH] async support via redis and rq --- projects/crawler/src/basango/cli.py | 86 +++++- .../crawler/src/basango/services/__init__.py | 12 +- .../services/crawler/async/__init__.py | 22 ++ .../basango/services/crawler/async/queue.py | 82 ++++++ .../basango/services/crawler/async/schemas.py | 55 ++++ .../basango/services/crawler/async/tasks.py | 269 ++++++++++++++++++ .../basango/services/crawler/async/worker.py | 29 ++ .../src/basango/services/crawler/async_api.py | 33 +++ .../basango/services/crawler/base_crawler.py | 39 ++- .../basango/services/crawler/html_crawler.py | 10 +- .../services/crawler/wordpress_crawler.py | 12 +- .../src/basango/services/http_client.py | 152 ---------- .../basango/services/http_client/__init__.py | 9 + .../services/http_client/async_http_client.py | 123 ++++++++ .../services/http_client/base_http_client.py | 90 ++++++ .../services/http_client/sync_http_client.py | 109 +++++++ .../src/basango/services/open_graph.py | 4 +- .../basango/services/persistence/__init__.py | 11 + .../services/persistence/api_persistor.py | 35 +++ .../services/persistence/base_persistor.py | 16 ++ .../services/persistence/csv_persistor.py | 69 +++++ .../services/persistence/json_persistor.py | 30 ++ 22 files changed, 1123 insertions(+), 174 deletions(-) create mode 100644 projects/crawler/src/basango/services/crawler/async/__init__.py create mode 100644 projects/crawler/src/basango/services/crawler/async/queue.py create mode 100644 projects/crawler/src/basango/services/crawler/async/schemas.py create mode 100644 projects/crawler/src/basango/services/crawler/async/tasks.py create mode 100644 projects/crawler/src/basango/services/crawler/async/worker.py create mode 100644 projects/crawler/src/basango/services/crawler/async_api.py delete mode 100644 projects/crawler/src/basango/services/http_client.py create mode 100644 projects/crawler/src/basango/services/http_client/__init__.py create mode 100644 projects/crawler/src/basango/services/http_client/async_http_client.py create mode 100644 projects/crawler/src/basango/services/http_client/base_http_client.py create mode 100644 projects/crawler/src/basango/services/http_client/sync_http_client.py create mode 100644 projects/crawler/src/basango/services/persistence/__init__.py create mode 100644 projects/crawler/src/basango/services/persistence/api_persistor.py create mode 100644 projects/crawler/src/basango/services/persistence/base_persistor.py create mode 100644 projects/crawler/src/basango/services/persistence/csv_persistor.py create mode 100644 projects/crawler/src/basango/services/persistence/json_persistor.py diff --git a/projects/crawler/src/basango/cli.py b/projects/crawler/src/basango/cli.py index 91aa5c3..8746b73 100644 --- a/projects/crawler/src/basango/cli.py +++ b/projects/crawler/src/basango/cli.py @@ -1,8 +1,18 @@ +from __future__ import annotations + +from typing import List, Optional + import typer from basango.core.config import CrawlerConfig from basango.core.config_manager import ConfigManager -from basango.domain import PageRange, DateRange, UpdateDirection +from basango.domain import DateRange, PageRange, UpdateDirection +from basango.services import CsvPersistor +from basango.services.crawler.async_api import ( + QueueSettings, + schedule_async_crawl, + start_worker, +) from basango.services.crawler.html_crawler import HtmlCrawler from basango.services.crawler.wordpress_crawler import WordpressCrawler @@ -21,16 +31,34 @@ def crawl_cmd( category: str = typer.Option(None, "--category", "-g", help="Optional category"), notify: bool = typer.Option(False, "--notify", "-n", help="Enable notifications"), env: str = typer.Option("development", "--env", "-c", help="Environment"), + async_mode: bool = typer.Option( + False, + "--async/--no-async", + help="Schedule crawl through Redis queues instead of running synchronously.", + ), ) -> None: - """Crawl a single source based on CLI-provided settings.""" + """Crawl a single source, either synchronously or via the async queue.""" manager = ConfigManager() - pipeline = manager.get(env) manager.ensure_directories(pipeline) manager.setup_logging(pipeline) source = pipeline.sources.find(source_id) - assert source is not None, f"Source '{source_id}' not found in config" + if source is None: + raise typer.BadParameter(f"Source '{source_id}' not found in config") + + if async_mode: + job_id = schedule_async_crawl( + source_id=source_id, + env=env, + page_range=page, + date_range=date, + category=category, + ) + typer.echo( + f"Scheduled async crawl job {job_id} for source '{source_id}' on queue" + ) + return crawler_config = CrawlerConfig( source=source, @@ -46,8 +74,56 @@ def crawl_cmd( WordpressCrawler, ] + source_identifier = getattr(source, "source_id", source_id) or source_id + persistors = [ + CsvPersistor( + data_dir=pipeline.paths.data, + source_id=str(source_identifier), + ) + ] + for crawler in crawlers: if crawler.supports() == source.source_kind: - crawler = crawler(crawler_config, pipeline.fetch.client) + crawler = crawler( + crawler_config, + pipeline.fetch.client, + persistors=persistors, + ) crawler.fetch() break + + +@app.command("worker") +def worker_cmd( + queue: Optional[List[str]] = typer.Option( + None, + "--queue", + "-q", + help="Queue name(s) (without prefix). Provide multiple times to listen to more than one queue.", + ), + burst: bool = typer.Option( + False, + "--burst", + help="Process available jobs and exit instead of running continuously.", + ), + redis_url: str = typer.Option( + None, + "--redis-url", + help="Redis connection URL. Defaults to BASANGO_REDIS_URL.", + ), + env: str = typer.Option( + "development", + "--env", + "-c", + help="Environment used to configure logging before starting the worker.", + ), +) -> None: + """Run an RQ worker that consumes crawler queues.""" + manager = ConfigManager() + pipeline = manager.get(env) + manager.ensure_directories(pipeline) + manager.setup_logging(pipeline) + + settings = QueueSettings(redis_url=redis_url) if redis_url else QueueSettings() + queue_names = list(queue) if queue else None + start_worker(queue_names=queue_names, settings=settings, burst=burst) diff --git a/projects/crawler/src/basango/services/__init__.py b/projects/crawler/src/basango/services/__init__.py index 22b0a04..6dee296 100644 --- a/projects/crawler/src/basango/services/__init__.py +++ b/projects/crawler/src/basango/services/__init__.py @@ -1,11 +1,21 @@ from .date_parser import DateParser -from .http_client import HttpClient +from .http_client import BaseHttpClient, SyncHttpClient, AsyncHttpClient from .open_graph import OpenGraphProvider +from .persistence import BasePersistor, CsvPersistor, JsonPersistor, ApiPersistor from .user_agents import UserAgentProvider +HttpClient = SyncHttpClient + __all__ = [ "DateParser", + "BaseHttpClient", + "SyncHttpClient", + "AsyncHttpClient", "HttpClient", "OpenGraphProvider", "UserAgentProvider", + "BasePersistor", + "CsvPersistor", + "JsonPersistor", + "ApiPersistor", ] diff --git a/projects/crawler/src/basango/services/crawler/async/__init__.py b/projects/crawler/src/basango/services/crawler/async/__init__.py new file mode 100644 index 0000000..72407df --- /dev/null +++ b/projects/crawler/src/basango/services/crawler/async/__init__.py @@ -0,0 +1,22 @@ +from .queue import QueueManager, QueueSettings +from .schemas import ListingTaskPayload, ArticleTaskPayload, ProcessedTaskPayload +from .tasks import ( + schedule_async_crawl, + collect_listing, + collect_article, + forward_for_processing, +) +from .worker import start_worker + +__all__ = [ + "QueueManager", + "QueueSettings", + "ListingTaskPayload", + "ArticleTaskPayload", + "ProcessedTaskPayload", + "schedule_async_crawl", + "collect_listing", + "collect_article", + "forward_for_processing", + "start_worker", +] diff --git a/projects/crawler/src/basango/services/crawler/async/queue.py b/projects/crawler/src/basango/services/crawler/async/queue.py new file mode 100644 index 0000000..ce627e0 --- /dev/null +++ b/projects/crawler/src/basango/services/crawler/async/queue.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from typing import Iterable + +from redis import Redis +from rq import Queue + +from .schemas import ( + ArticleTaskPayload, + ListingTaskPayload, + ProcessedTaskPayload, +) + + +@dataclass(slots=True) +class QueueSettings: + redis_url: str = field( + default_factory=lambda: os.getenv( # type: ignore[arg-type] + "BASANGO_REDIS_URL", "redis://localhost:6379/0" + ) + ) + prefix: str = field( + default_factory=lambda: os.getenv("BASANGO_QUEUE_PREFIX", "crawler") + ) + default_timeout: int = field( + default_factory=lambda: int(os.getenv("BASANGO_QUEUE_TIMEOUT", "600")) + ) + result_ttl: int = field( + default_factory=lambda: int(os.getenv("BASANGO_QUEUE_RESULT_TTL", "3600")) + ) + failure_ttl: int = field( + default_factory=lambda: int(os.getenv("BASANGO_QUEUE_FAILURE_TTL", "3600")) + ) + listing_queue: str = "listing" + article_queue: str = "articles" + processed_queue: str = "processed" + + +class QueueManager: + def __init__(self, settings: QueueSettings | None = None) -> None: + self.settings = settings or QueueSettings() + self.connection = Redis.from_url(self.settings.redis_url) + self.listing_queue = self._build_queue(self.settings.listing_queue) + self.article_queue = self._build_queue(self.settings.article_queue) + self.processed_queue = self._build_queue(self.settings.processed_queue) + + def _build_queue(self, suffix: str) -> Queue: + return Queue( + self.queue_name(suffix), + connection=self.connection, + default_timeout=self.settings.default_timeout, + result_ttl=self.settings.result_ttl, + failure_ttl=self.settings.failure_ttl, + ) + + def queue_name(self, suffix: str) -> str: + return f"{self.settings.prefix}:{suffix}" + + def enqueue_listing(self, payload: ListingTaskPayload): + return self.listing_queue.enqueue( + "basango.services.crawler.async.tasks.collect_listing", + payload.to_dict(), + ) + + def enqueue_article(self, payload: ArticleTaskPayload): + return self.article_queue.enqueue( + "basango.services.crawler.async.tasks.collect_article", + payload.to_dict(), + ) + + def enqueue_processed(self, payload: ProcessedTaskPayload): + return self.processed_queue.enqueue( + "basango.services.crawler.async.tasks.forward_for_processing", + payload.to_dict(), + ) + + def iter_queue_names(self) -> Iterable[str]: + yield self.queue_name(self.settings.listing_queue) + yield self.queue_name(self.settings.article_queue) + yield self.queue_name(self.settings.processed_queue) diff --git a/projects/crawler/src/basango/services/crawler/async/schemas.py b/projects/crawler/src/basango/services/crawler/async/schemas.py new file mode 100644 index 0000000..420dc53 --- /dev/null +++ b/projects/crawler/src/basango/services/crawler/async/schemas.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from dataclasses import asdict, dataclass, fields +from typing import Any, Mapping + + +def _coerce_kwargs(cls, data: Mapping[str, Any]) -> dict[str, Any]: + return {field.name: data.get(field.name) for field in fields(cls)} + + +@dataclass(slots=True) +class ListingTaskPayload: + source_id: str + env: str = "development" + page_range: str | None = None + date_range: str | None = None + category: str | None = None + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "ListingTaskPayload": + return cls(**_coerce_kwargs(cls, data)) + + +@dataclass(slots=True) +class ArticleTaskPayload: + source_id: str + env: str = "development" + url: str | None = None + data: Any | None = None + date_range: str | None = None + category: str | None = None + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "ArticleTaskPayload": + return cls(**_coerce_kwargs(cls, data)) + + +@dataclass(slots=True) +class ProcessedTaskPayload: + source_id: str + env: str = "development" + article: Mapping[str, Any] | None = None + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "ProcessedTaskPayload": + return cls(**_coerce_kwargs(cls, data)) diff --git a/projects/crawler/src/basango/services/crawler/async/tasks.py b/projects/crawler/src/basango/services/crawler/async/tasks.py new file mode 100644 index 0000000..399376c --- /dev/null +++ b/projects/crawler/src/basango/services/crawler/async/tasks.py @@ -0,0 +1,269 @@ +from __future__ import annotations + +import logging +from typing import Any + +from basango.core.config import CrawlerConfig +from basango.core.config_manager import ConfigManager +from basango.domain import DateRange, PageRange, SourceKind, UpdateDirection +from basango.services import CsvPersistor +from basango.services.crawler.html_crawler import HtmlCrawler +from basango.services.crawler.wordpress_crawler import WordpressCrawler + +from .queue import QueueManager, QueueSettings +from .schemas import ( + ArticleTaskPayload, + ListingTaskPayload, + ProcessedTaskPayload, +) + + +logger = logging.getLogger(__name__) + + +def schedule_async_crawl( + *, + source_id: str, + env: str = "development", + page_range: str | None = None, + date_range: str | None = None, + category: str | None = None, + settings: QueueSettings | None = None, +): + payload = ListingTaskPayload( + source_id=source_id, + env=env, + page_range=page_range, + date_range=date_range, + category=category, + ) + manager = QueueManager(settings=settings) + job = manager.enqueue_listing(payload) + logger.info("Scheduled listing collection job %s for source %s", job.id, source_id) + return job.id + + +def collect_listing(payload: dict[str, Any]) -> int: + data = ListingTaskPayload.from_dict(payload) + manager = ConfigManager() + pipeline = manager.get(data.env) + source = pipeline.sources.find(data.source_id) + if source is None: + logger.error("Unknown source id %s", data.source_id) + return 0 + + crawler_config = CrawlerConfig( + source=source, + page_range=PageRange.create(data.page_range) if data.page_range else None, + date_range=DateRange.create(data.date_range) if data.date_range else None, + category=data.category, + notify=False, + direction=UpdateDirection.FORWARD, + ) + client_config = pipeline.fetch.client + queue_manager = QueueManager() + + if source.source_kind == SourceKind.HTML: + crawler = HtmlCrawler(crawler_config, client_config) + queued = _collect_html_listing(crawler, data, queue_manager) + elif source.source_kind == SourceKind.WORDPRESS: + crawler = WordpressCrawler(crawler_config, client_config) + queued = _collect_wordpress_listing(crawler, data, queue_manager) + else: + logger.warning( + "Async crawling not supported for source kind %s", source.source_kind + ) + queued = 0 + + logger.info("Queued %s article detail jobs for source %s", queued, data.source_id) + return queued + + +def collect_article(payload: dict[str, Any]) -> dict[str, Any] | None: + data = ArticleTaskPayload.from_dict(payload) + manager = ConfigManager() + pipeline = manager.get(data.env) + source = pipeline.sources.find(data.source_id) + if source is None: + logger.error("Unknown source id %s", data.source_id) + return None + + crawler_config = CrawlerConfig( + source=source, + date_range=DateRange.create(data.date_range) if data.date_range else None, + category=data.category, + notify=False, + direction=UpdateDirection.FORWARD, + ) + + source_identifier = getattr(source, "source_id", data.source_id) or data.source_id + persistors = [ + CsvPersistor( + data_dir=pipeline.paths.data, + source_id=str(source_identifier), + ) + ] + + queue_manager = QueueManager() + + if source.source_kind == SourceKind.HTML: + article = _collect_html_article( + HtmlCrawler(crawler_config, pipeline.fetch.client, persistors=persistors), + data, + ) + elif source.source_kind == SourceKind.WORDPRESS: + article = _collect_wordpress_article( + WordpressCrawler( + crawler_config, pipeline.fetch.client, persistors=persistors + ), + data, + ) + else: + logger.warning( + "Async crawling not supported for source kind %s", source.source_kind + ) + article = None + + if article: + queue_manager.enqueue_processed( + ProcessedTaskPayload( + source_id=data.source_id, + env=data.env, + article=article, + ) + ) + logger.info( + "Persisted article %s and forwarded to processed queue", + article.get("link"), + ) + + return article + + +def forward_for_processing(payload: dict[str, Any]) -> dict[str, Any] | None: + data = ProcessedTaskPayload.from_dict(payload) + article = dict(data.article) if data.article is not None else None + if article is None: + logger.info( + "Ready for downstream processing: source=%s (no article)", data.source_id + ) + return None + logger.info( + "Ready for downstream processing: source=%s link=%s", + data.source_id, + article.get("link"), + ) + return article + + +def _collect_html_listing( + crawler: HtmlCrawler, + payload: ListingTaskPayload, + queue_manager: QueueManager, +) -> int: + source = crawler.source + selector = source.source_selectors.articles + if not selector: + logger.warning( + "No article selector configured for HTML source %s", + source.source_id, + ) + return 0 + + page_range = crawler.config.page_range or crawler.get_pagination() + queued = 0 + + for page in range(page_range.start, page_range.end + 1): + page_url = crawler._build_page_url(page) + try: + soup = crawler.crawl(page_url, page) + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to crawl page %s: %s", page_url, exc) + continue + + for node in soup.select(selector): + link = crawler._extract_link(node) + if not link: + continue + queue_manager.enqueue_article( + ArticleTaskPayload( + source_id=payload.source_id, + env=payload.env, + url=link, + date_range=payload.date_range, + category=payload.category, + ) + ) + queued += 1 + + return queued + + +def _collect_wordpress_listing( + crawler: WordpressCrawler, + payload: ListingTaskPayload, + queue_manager: QueueManager, +) -> int: + page_range = crawler.config.page_range or crawler.get_pagination() + queued = 0 + + for page in range(page_range.start, page_range.end + 1): + endpoint = crawler._posts_endpoint(page) + try: + response = crawler.client.get(endpoint) + articles = response.json() + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to fetch WordPress page %s: %s", endpoint, exc) + continue + + if not isinstance(articles, list): + logger.warning("Unexpected WordPress payload type: %s", type(articles)) + continue + + for entry in articles: + queue_manager.enqueue_article( + ArticleTaskPayload( + source_id=payload.source_id, + env=payload.env, + url=entry.get("link"), + data=entry, + date_range=payload.date_range, + category=payload.category, + ) + ) + queued += 1 + + return queued + + +def _collect_html_article( + crawler: HtmlCrawler, + payload: ArticleTaskPayload, +) -> dict[str, Any] | None: + if not payload.url: + logger.warning("Missing article url for HTML source %s", payload.source_id) + return None + + crawler._current_article_url = payload.url # type: ignore[attr-defined] + try: + soup = crawler.crawl(payload.url) + except Exception as exc: # noqa: BLE001 + logger.exception("Failed to crawl article %s: %s", payload.url, exc) + return None + + crawler.fetch_one(str(soup), crawler.config.date_range) + crawler.completed(False) + return None + + +def _collect_wordpress_article( + crawler: WordpressCrawler, + payload: ArticleTaskPayload, +) -> dict[str, Any] | None: + if payload.data is None: + logger.warning("Missing WordPress payload for source %s", payload.source_id) + return None + + crawler.fetch_one(payload.data, crawler.config.date_range) + crawler.completed(False) + return None diff --git a/projects/crawler/src/basango/services/crawler/async/worker.py b/projects/crawler/src/basango/services/crawler/async/worker.py new file mode 100644 index 0000000..87a9078 --- /dev/null +++ b/projects/crawler/src/basango/services/crawler/async/worker.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +import logging +from typing import Sequence + +from rq import Queue, Worker + +from .queue import QueueManager, QueueSettings + + +logger = logging.getLogger(__name__) + + +def start_worker( + queue_names: Sequence[str] | None = None, + *, + settings: QueueSettings | None = None, + burst: bool = False, +) -> None: + manager = QueueManager(settings=settings) + if queue_names is None or not list(queue_names): + queue_names = [manager.settings.article_queue] + + resolved = [manager.queue_name(name) for name in queue_names] + queues = [Queue(name, connection=manager.connection) for name in resolved] + + logger.info("Starting RQ worker for queues %s", ", ".join(resolved)) + worker = Worker(queues, connection=manager.connection) + worker.work(burst=burst) diff --git a/projects/crawler/src/basango/services/crawler/async_api.py b/projects/crawler/src/basango/services/crawler/async_api.py new file mode 100644 index 0000000..000b4c1 --- /dev/null +++ b/projects/crawler/src/basango/services/crawler/async_api.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from importlib import import_module +from typing import Any, Sequence + +_async_queue = import_module("basango.services.crawler.async.queue") +_async_tasks = import_module("basango.services.crawler.async.tasks") +_async_worker = import_module("basango.services.crawler.async.worker") +_async_schemas = import_module("basango.services.crawler.async.schemas") + +QueueManager = getattr(_async_queue, "QueueManager") +QueueSettings = getattr(_async_queue, "QueueSettings") +ListingTaskPayload = getattr(_async_schemas, "ListingTaskPayload") +ArticleTaskPayload = getattr(_async_schemas, "ArticleTaskPayload") +ProcessedTaskPayload = getattr(_async_schemas, "ProcessedTaskPayload") +schedule_async_crawl = getattr(_async_tasks, "schedule_async_crawl") +collect_listing = getattr(_async_tasks, "collect_listing") +collect_article = getattr(_async_tasks, "collect_article") +forward_for_processing = getattr(_async_tasks, "forward_for_processing") +start_worker = getattr(_async_worker, "start_worker") + +__all__ = [ + "QueueManager", + "QueueSettings", + "ListingTaskPayload", + "ArticleTaskPayload", + "ProcessedTaskPayload", + "schedule_async_crawl", + "collect_listing", + "collect_article", + "forward_for_processing", + "start_worker", +] diff --git a/projects/crawler/src/basango/services/crawler/base_crawler.py b/projects/crawler/src/basango/services/crawler/base_crawler.py index 51fa6b7..a489706 100644 --- a/projects/crawler/src/basango/services/crawler/base_crawler.py +++ b/projects/crawler/src/basango/services/crawler/base_crawler.py @@ -1,24 +1,27 @@ import logging from abc import ABC, abstractmethod from dataclasses import asdict, is_dataclass -from typing import Optional, Any, Dict, List +from typing import Optional, Any, Dict, List, Sequence from bs4 import BeautifulSoup from basango.core.config import CrawlerConfig, ClientConfig from basango.domain import DateRange, SourceKind, PageRange from basango.domain.exception import ArticleOutOfRange -from basango.services import HttpClient, DateParser, OpenGraphProvider +from basango.services import HttpClient, DateParser, OpenGraphProvider, BasePersistor class BaseCrawler(ABC): def __init__( - self, crawler_config: CrawlerConfig, client_config: ClientConfig + self, + crawler_config: CrawlerConfig, + client_config: ClientConfig, + persistors: Sequence[BasePersistor] | None = None, ) -> None: self.config = crawler_config self.source = crawler_config.source self.client = HttpClient(client_config=client_config) - self.results: List[Dict[str, Any]] = [] + self.persistors: list[BasePersistor] = list(persistors) if persistors else [] self.date_parser = DateParser() self.open_graph = OpenGraphProvider() @@ -49,6 +52,7 @@ class BaseCrawler(ABC): metadata_value = asdict(metadata) else: metadata_value = metadata + article = { "title": title, "link": link, @@ -58,7 +62,7 @@ class BaseCrawler(ABC): "timestamp": timestamp, "metadata": metadata_value, } - self.results.append(article) + self._persist(article) logging.info(f"> {title} [saved]") @abstractmethod @@ -85,7 +89,8 @@ class BaseCrawler(ABC): logging.info("Crawling completed") if notify: logging.info("Sending notification about completion") - # Implement notification logic here + # TODO: Implement notification logic here + self._shutdown_persistors() @classmethod def skip(cls, date_range: DateRange, timestamp: str, title: str, date: str) -> None: @@ -93,3 +98,25 @@ class BaseCrawler(ABC): raise ArticleOutOfRange.create(timestamp, date_range) logging.warning(f"> {title} [Skipped {date}]") + + def _persist(self, article: Dict[str, Any]) -> None: + for persistor in self.persistors: + try: + persistor.persist(article) + except Exception as exc: # noqa: BLE001 + logging.exception( + "Failed to persist article via %s: %s", + persistor.__class__.__name__, + exc, + ) + + def _shutdown_persistors(self) -> None: + for persistor in self.persistors: + try: + persistor.close() + except Exception as exc: # noqa: BLE001 + logging.exception( + "Failed to close persistor %s: %s", + persistor.__class__.__name__, + exc, + ) diff --git a/projects/crawler/src/basango/services/crawler/html_crawler.py b/projects/crawler/src/basango/services/crawler/html_crawler.py index 3481e1f..271757e 100644 --- a/projects/crawler/src/basango/services/crawler/html_crawler.py +++ b/projects/crawler/src/basango/services/crawler/html_crawler.py @@ -1,7 +1,7 @@ import logging import re from datetime import datetime, timezone -from typing import Optional, cast, override +from typing import Optional, cast, override, Sequence from urllib.parse import parse_qs, urljoin, urlparse from bs4 import BeautifulSoup, Tag @@ -11,13 +11,17 @@ from basango.core.config.source_config import HtmlSourceConfig from basango.domain import DateRange, PageRange, SourceKind from basango.domain.exception import ArticleOutOfRange from basango.services.crawler.base_crawler import BaseCrawler +from basango.services import BasePersistor class HtmlCrawler(BaseCrawler): def __init__( - self, crawler_config: CrawlerConfig, client_config: ClientConfig + self, + crawler_config: CrawlerConfig, + client_config: ClientConfig, + persistors: Sequence[BasePersistor] | None = None, ) -> None: - super().__init__(crawler_config, client_config) + super().__init__(crawler_config, client_config, persistors=persistors) if not self.source or self.source.source_kind != SourceKind.HTML: raise ValueError("HtmlCrawler requires a source of kind HTML") diff --git a/projects/crawler/src/basango/services/crawler/wordpress_crawler.py b/projects/crawler/src/basango/services/crawler/wordpress_crawler.py index e4de74b..ebe40c0 100644 --- a/projects/crawler/src/basango/services/crawler/wordpress_crawler.py +++ b/projects/crawler/src/basango/services/crawler/wordpress_crawler.py @@ -1,7 +1,7 @@ import json import logging from datetime import datetime, timezone -from typing import Optional, override, cast, Final, Any +from typing import Optional, override, cast, Final, Any, Sequence from bs4 import BeautifulSoup @@ -9,13 +9,17 @@ from basango.core.config import WordPressSourceConfig, CrawlerConfig, ClientConf from basango.domain import SourceKind, PageRange, DateRange from basango.domain.exception import ArticleOutOfRange from basango.services.crawler.base_crawler import BaseCrawler +from basango.services import BasePersistor class WordpressCrawler(BaseCrawler): def __init__( - self, crawler_config: CrawlerConfig, client_config: ClientConfig + self, + crawler_config: CrawlerConfig, + client_config: ClientConfig, + persistors: Sequence[BasePersistor] | None = None, ) -> None: - super().__init__(crawler_config, client_config) + super().__init__(crawler_config, client_config, persistors=persistors) if not self.source or self.source.source_kind != SourceKind.WORDPRESS: raise ValueError("WordpressCrawler requires a source of kind WORDPRESS") @@ -162,9 +166,7 @@ class WordpressCrawler(BaseCrawler): def get_last_page(self) -> int: return 1 - @staticmethod @staticmethod @override def supports() -> SourceKind: return SourceKind.WORDPRESS - diff --git a/projects/crawler/src/basango/services/http_client.py b/projects/crawler/src/basango/services/http_client.py deleted file mode 100644 index d1a9f87..0000000 --- a/projects/crawler/src/basango/services/http_client.py +++ /dev/null @@ -1,152 +0,0 @@ -import random -import time -from dataclasses import dataclass -from datetime import datetime, timezone -from email.utils import parsedate_to_datetime -from typing import Any, Optional, TypeAlias - -import httpx - -from basango.core.config import ClientConfig -from basango.services.user_agents import UserAgentProvider - -HttpHeaders: TypeAlias = dict[str, str] | None -HttpParams: TypeAlias = dict[str, Any] | None -HttpData: TypeAlias = Any | None - -TRANSIENT_STATUSES = (429, 500, 502, 503, 504) - - -@dataclass -class HttpClient: - client_config: ClientConfig - user_agent_provider: UserAgentProvider | None = None - default_headers: HttpHeaders = None - - def _compute_backoff(self, attempt: int) -> float: - base = min( - self.client_config.backoff_initial - * (self.client_config.backoff_multiplier**attempt), - self.client_config.backoff_max, - ) - jitter = random.uniform(0, base * 0.25) - return base + jitter - - def _retry_delay( - self, attempt: int, response: Optional[httpx.Response] = None - ) -> float: - delay = 0.0 - - if response is not None and self.client_config.respect_retry_after: - retry_after = ( - response.headers.get("Retry-After") if response.headers else None - ) - if retry_after: - try: - delay = max(0.0, float(int(retry_after))) - except ValueError: - try: - dt = parsedate_to_datetime(retry_after) - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - now = datetime.now(timezone.utc) - delay = max(0.0, (dt - now).total_seconds()) - except Exception: # noqa: BLE001 - pass - - if delay == 0.0: - delay = self._compute_backoff(attempt) - - return delay - - def __post_init__(self) -> None: - if self.user_agent_provider is not None: - user_agent = self.user_agent_provider.get() - self._user_agent = ( - user_agent if user_agent else self.client_config.user_agent - ) - else: - provider = UserAgentProvider( - rotate=self.client_config.rotate, - fallback=self.client_config.user_agent, - ) - user_agent = provider.get() - self._user_agent = ( - user_agent if user_agent else self.client_config.user_agent - ) - - headers = {"User-Agent": self._user_agent} - - if self.default_headers: - headers.update(self.default_headers) - - self._client = httpx.Client( - follow_redirects=self.client_config.follow_redirects, - max_redirects=5, - verify=self.client_config.verify_ssl, - timeout=self.client_config.timeout, - headers=headers, - ) - - # Context manager support ------------------------------------------------- - def __enter__(self) -> "HttpClient": # noqa: D401 - return self - - def __exit__(self, exc_type, exc, tb) -> None: # noqa: D401 - self.close() - - def close(self) -> None: - try: - self._client.close() - except Exception: # noqa: BLE001 - pass - - # Core request with retries ---------------------------------------------- - def _request( - self, - method: str, - url: str, - *, - headers: HttpHeaders = None, - params: HttpParams = None, - data: Any | None = None, - json: Any | None = None, - ) -> httpx.Response: - attempt = 0 - while True: - try: - response = self._client.request( - method, url, headers=headers, params=params, data=data, json=json - ) - if ( - response.status_code in TRANSIENT_STATUSES - ) and attempt < self.client_config.max_retries: - time.sleep(self._retry_delay(attempt, response)) - attempt += 1 - continue - response.raise_for_status() - return response - except httpx.HTTPStatusError as e: - status = e.response.status_code if e.response else 0 - if ( - status in TRANSIENT_STATUSES - ) and attempt < self.client_config.max_retries: - time.sleep(self._retry_delay(attempt, e.response)) - attempt += 1 - continue - raise - except httpx.RequestError: - if attempt < self.client_config.max_retries: - time.sleep(self._compute_backoff(attempt)) - attempt += 1 - continue - raise - - # Public helpers ---------------------------------------------------------- - def get(self, url: str) -> httpx.Response: - return self._request("GET", url) - - def post( - self, url: str, data: HttpData = None, json: HttpData = None - ) -> httpx.Response: - return self._request("POST", url, data=data, json=json) diff --git a/projects/crawler/src/basango/services/http_client/__init__.py b/projects/crawler/src/basango/services/http_client/__init__.py new file mode 100644 index 0000000..d0f18a2 --- /dev/null +++ b/projects/crawler/src/basango/services/http_client/__init__.py @@ -0,0 +1,9 @@ +from .base_http_client import BaseHttpClient +from .sync_http_client import SyncHttpClient +from .async_http_client import AsyncHttpClient + +__all__ = [ + "BaseHttpClient", + "SyncHttpClient", + "AsyncHttpClient", +] diff --git a/projects/crawler/src/basango/services/http_client/async_http_client.py b/projects/crawler/src/basango/services/http_client/async_http_client.py new file mode 100644 index 0000000..46b11f5 --- /dev/null +++ b/projects/crawler/src/basango/services/http_client/async_http_client.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field + +import httpx + +from .base_http_client import ( + BaseHttpClient, + HttpData, + HttpHeaders, + HttpParams, + TRANSIENT_STATUSES, +) + + +@dataclass +class AsyncHttpClient(BaseHttpClient): + _client: httpx.AsyncClient = field(init=False, repr=False) + + def __post_init__(self) -> None: + super().__post_init__() + self._client = httpx.AsyncClient( + follow_redirects=self.client_config.follow_redirects, + max_redirects=5, + verify=self.client_config.verify_ssl, + timeout=self.client_config.timeout, + headers=dict(self._headers), + ) + + async def __aenter__(self) -> "AsyncHttpClient": + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.aclose() + + def close(self) -> None: + if self._client.is_closed: + return + try: + loop = asyncio.get_running_loop() + except RuntimeError: # no running loop + asyncio.run(self.aclose()) + else: + loop.create_task(self.aclose()) + + async def aclose(self) -> None: + try: + await self._client.aclose() + except Exception: # noqa: BLE001 + pass + + async def _request( + self, + method: str, + url: str, + *, + headers: HttpHeaders = None, + params: HttpParams = None, + data: HttpData = None, + json: HttpData = None, + ) -> httpx.Response: + attempt = 0 + while True: + try: + response = await self._client.request( + method, + url, + headers=self._build_headers(headers), + params=params, + data=data, + json=json, + ) + if ( + response.status_code in TRANSIENT_STATUSES + ) and attempt < self.client_config.max_retries: + await asyncio.sleep(self._retry_delay(attempt, response)) + attempt += 1 + continue + response.raise_for_status() + return response + except httpx.HTTPStatusError as exc: + status = exc.response.status_code if exc.response else 0 + if ( + status in TRANSIENT_STATUSES + ) and attempt < self.client_config.max_retries: + await asyncio.sleep(self._retry_delay(attempt, exc.response)) + attempt += 1 + continue + raise + except httpx.RequestError: + if attempt < self.client_config.max_retries: + await asyncio.sleep(self._compute_backoff(attempt)) + attempt += 1 + continue + raise + + async def get( + self, + url: str, + *, + headers: HttpHeaders = None, + params: HttpParams = None, + ) -> httpx.Response: + return await self._request("GET", url, headers=headers, params=params) + + async def post( + self, + url: str, + *, + headers: HttpHeaders = None, + params: HttpParams = None, + data: HttpData = None, + json: HttpData = None, + ) -> httpx.Response: + return await self._request( + "POST", + url, + headers=headers, + params=params, + data=data, + json=json, + ) diff --git a/projects/crawler/src/basango/services/http_client/base_http_client.py b/projects/crawler/src/basango/services/http_client/base_http_client.py new file mode 100644 index 0000000..7d04b9d --- /dev/null +++ b/projects/crawler/src/basango/services/http_client/base_http_client.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +import random +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime, timezone +from email.utils import parsedate_to_datetime +from typing import Any, Optional, TypeAlias + +import httpx + +from basango.core.config import ClientConfig +from basango.services.user_agents import UserAgentProvider + +HttpHeaders: TypeAlias = dict[str, str] | None +HttpParams: TypeAlias = dict[str, Any] | None +HttpData: TypeAlias = Any | None + +TRANSIENT_STATUSES = (429, 500, 502, 503, 504) + + +@dataclass +class BaseHttpClient(ABC): + client_config: ClientConfig + user_agent_provider: UserAgentProvider | None = None + default_headers: HttpHeaders = None + _user_agent: str = field(init=False, repr=False) + _headers: dict[str, str] = field(init=False, repr=False) + + def __post_init__(self) -> None: + provider = self.user_agent_provider or UserAgentProvider( + rotate=self.client_config.rotate, + fallback=self.client_config.user_agent, + ) + user_agent = provider.get() + self._user_agent = user_agent if user_agent else self.client_config.user_agent + + headers = {"User-Agent": self._user_agent} + if self.default_headers: + headers.update(self.default_headers) + self._headers = headers + + def _compute_backoff(self, attempt: int) -> float: + base = min( + self.client_config.backoff_initial + * (self.client_config.backoff_multiplier**attempt), + self.client_config.backoff_max, + ) + jitter = random.uniform(0, base * 0.25) + return base + jitter + + def _retry_delay( + self, attempt: int, response: Optional[httpx.Response] = None + ) -> float: + delay = 0.0 + if response is not None and self.client_config.respect_retry_after: + retry_after = ( + response.headers.get("Retry-After") if response.headers else None + ) + if retry_after: + delay = self._parse_retry_after(retry_after) + + if delay == 0.0: + delay = self._compute_backoff(attempt) + return delay + + @staticmethod + def _parse_retry_after(header_value: str) -> float: + try: + return max(0.0, float(int(header_value))) + except (TypeError, ValueError): + try: + dt = parsedate_to_datetime(header_value) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + now = datetime.now(timezone.utc) + return max(0.0, (dt - now).total_seconds()) + except Exception: # noqa: BLE001 + return 0.0 + + def _build_headers(self, headers: HttpHeaders = None) -> dict[str, str]: + merged = dict(self._headers) + if headers: + merged.update(headers) + return merged + + @abstractmethod + def close(self) -> None: # pragma: no cover - enforced by subclasses + """Close the underlying HTTPX client.""" diff --git a/projects/crawler/src/basango/services/http_client/sync_http_client.py b/projects/crawler/src/basango/services/http_client/sync_http_client.py new file mode 100644 index 0000000..dc971f7 --- /dev/null +++ b/projects/crawler/src/basango/services/http_client/sync_http_client.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import time +from dataclasses import dataclass, field + +import httpx + +from .base_http_client import ( + BaseHttpClient, + HttpData, + HttpHeaders, + HttpParams, + TRANSIENT_STATUSES, +) + + +@dataclass +class SyncHttpClient(BaseHttpClient): + _client: httpx.Client = field(init=False, repr=False) + + def __post_init__(self) -> None: + super().__post_init__() + self._client = httpx.Client( + follow_redirects=self.client_config.follow_redirects, + max_redirects=5, + verify=self.client_config.verify_ssl, + timeout=self.client_config.timeout, + headers=dict(self._headers), + ) + + def __enter__(self) -> "SyncHttpClient": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + def close(self) -> None: + try: + self._client.close() + except Exception: # noqa: BLE001 + pass + + def _request( + self, + method: str, + url: str, + *, + headers: HttpHeaders = None, + params: HttpParams = None, + data: HttpData = None, + json: HttpData = None, + ) -> httpx.Response: + attempt = 0 + while True: + try: + response = self._client.request( + method, + url, + headers=self._build_headers(headers), + params=params, + data=data, + json=json, + ) + if ( + response.status_code in TRANSIENT_STATUSES + ) and attempt < self.client_config.max_retries: + time.sleep(self._retry_delay(attempt, response)) + attempt += 1 + continue + response.raise_for_status() + return response + except httpx.HTTPStatusError as exc: + status = exc.response.status_code if exc.response else 0 + if ( + status in TRANSIENT_STATUSES + ) and attempt < self.client_config.max_retries: + time.sleep(self._retry_delay(attempt, exc.response)) + attempt += 1 + continue + raise + except httpx.RequestError: + if attempt < self.client_config.max_retries: + time.sleep(self._compute_backoff(attempt)) + attempt += 1 + continue + raise + + def get( + self, url: str, *, headers: HttpHeaders = None, params: HttpParams = None + ) -> httpx.Response: + return self._request("GET", url, headers=headers, params=params) + + def post( + self, + url: str, + *, + headers: HttpHeaders = None, + params: HttpParams = None, + data: HttpData = None, + json: HttpData = None, + ) -> httpx.Response: + return self._request( + "POST", + url, + headers=headers, + params=params, + data=data, + json=json, + ) diff --git a/projects/crawler/src/basango/services/open_graph.py b/projects/crawler/src/basango/services/open_graph.py index 73cec46..9ea3759 100644 --- a/projects/crawler/src/basango/services/open_graph.py +++ b/projects/crawler/src/basango/services/open_graph.py @@ -5,7 +5,7 @@ from typing import Optional import trafilatura from basango.core.config import ClientConfig -from basango.services.http_client import HttpClient +from basango.services.http_client import SyncHttpClient from basango.services.user_agents import UserAgentProvider @@ -22,7 +22,7 @@ class OpenGraphProvider: self, user_agent_provider: UserAgentProvider = UserAgentProvider(rotate=False) ) -> None: self._user_agent = user_agent_provider.og() - self._http_client = HttpClient( + self._http_client = SyncHttpClient( client_config=ClientConfig(), default_headers={"User-Agent": self._user_agent}, ) diff --git a/projects/crawler/src/basango/services/persistence/__init__.py b/projects/crawler/src/basango/services/persistence/__init__.py new file mode 100644 index 0000000..5e4bb1f --- /dev/null +++ b/projects/crawler/src/basango/services/persistence/__init__.py @@ -0,0 +1,11 @@ +from .base_persistor import BasePersistor +from .csv_persistor import CsvPersistor +from .json_persistor import JsonPersistor +from .api_persistor import ApiPersistor + +__all__ = [ + "BasePersistor", + "CsvPersistor", + "JsonPersistor", + "ApiPersistor", +] diff --git a/projects/crawler/src/basango/services/persistence/api_persistor.py b/projects/crawler/src/basango/services/persistence/api_persistor.py new file mode 100644 index 0000000..1a49dc3 --- /dev/null +++ b/projects/crawler/src/basango/services/persistence/api_persistor.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Any, Mapping + +from basango.services.http_client import SyncHttpClient + +from .base_persistor import BasePersistor + + +@dataclass +class ApiPersistor(BasePersistor): + endpoint: str + http_client: SyncHttpClient + headers: dict[str, str] | None = None + raise_for_status: bool = True + + def persist(self, article: Mapping[str, Any]) -> None: + try: + response = self.http_client.post( + self.endpoint, + json=article, + headers=self.headers, + ) + if self.raise_for_status: + response.raise_for_status() + except Exception as exc: # noqa: BLE001 + logging.exception( + "[ApiPersistor] Failed to persist article at %s: %s", + self.endpoint, + exc, + ) + if self.raise_for_status: + raise diff --git a/projects/crawler/src/basango/services/persistence/base_persistor.py b/projects/crawler/src/basango/services/persistence/base_persistor.py new file mode 100644 index 0000000..6385a43 --- /dev/null +++ b/projects/crawler/src/basango/services/persistence/base_persistor.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Mapping, Any + + +class BasePersistor(ABC): + """Abstract interface for article persistence backends.""" + + @abstractmethod + def persist(self, article: Mapping[str, Any]) -> None: + """Persist a single article payload.""" + + def close(self) -> None: # pragma: no cover - optional override + """Hook for subclasses that need explicit shutdown.""" + return None diff --git a/projects/crawler/src/basango/services/persistence/csv_persistor.py b/projects/crawler/src/basango/services/persistence/csv_persistor.py new file mode 100644 index 0000000..0d5bed7 --- /dev/null +++ b/projects/crawler/src/basango/services/persistence/csv_persistor.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import csv +import json +from dataclasses import dataclass, field +from pathlib import Path +from threading import Lock +from typing import Any, Mapping, Sequence + +from .base_persistor import BasePersistor + + +DEFAULT_FIELDS = ( + "title", + "link", + "body", + "categories", + "source", + "timestamp", + "metadata", +) + + +@dataclass +class CsvPersistor(BasePersistor): + data_dir: Path + source_id: str + fieldnames: Sequence[str] = DEFAULT_FIELDS + encoding: str = "utf-8" + _file_path: Path = field(init=False, repr=False) + _lock: Lock = field(default_factory=Lock, init=False, repr=False) + _header_written: bool = field(default=False, init=False, repr=False) + + def __post_init__(self) -> None: + self.data_dir.mkdir(parents=True, exist_ok=True) + self._file_path = self.data_dir / f"{self.source_id}.csv" + if self._file_path.exists() and self._file_path.stat().st_size > 0: + self._header_written = True + + def persist(self, article: Mapping[str, Any]) -> None: + record = self._serialise(article) + with self._lock: + needs_header = not self._header_written or not self._file_path.exists() + with self._file_path.open( + "a", newline="", encoding=self.encoding + ) as handle: + writer = csv.DictWriter(handle, fieldnames=self.fieldnames) + if needs_header: + writer.writeheader() + self._header_written = True + writer.writerow(record) + + def _serialise(self, article: Mapping[str, Any]) -> dict[str, Any]: + categories = article.get("categories") + if isinstance(categories, (list, tuple)): + serialised_categories = ";".join(str(item) for item in categories) + else: + serialised_categories = categories + + metadata = article.get("metadata") + if metadata is None or isinstance(metadata, str): + serialised_metadata = metadata + else: + serialised_metadata = json.dumps(metadata, ensure_ascii=False) + + record = {field: article.get(field) for field in self.fieldnames} + record["categories"] = serialised_categories + record["metadata"] = serialised_metadata + return record diff --git a/projects/crawler/src/basango/services/persistence/json_persistor.py b/projects/crawler/src/basango/services/persistence/json_persistor.py new file mode 100644 index 0000000..ea16b67 --- /dev/null +++ b/projects/crawler/src/basango/services/persistence/json_persistor.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from pathlib import Path +from threading import Lock +from typing import Any, Mapping + +from .base_persistor import BasePersistor + + +@dataclass +class JsonPersistor(BasePersistor): + data_dir: Path + source_id: str + suffix: str = ".jsonl" + encoding: str = "utf-8" + _file_path: Path = field(init=False, repr=False) + _lock: Lock = field(default_factory=Lock, init=False, repr=False) + + def __post_init__(self) -> None: + self.data_dir.mkdir(parents=True, exist_ok=True) + self._file_path = self.data_dir / f"{self.source_id}{self.suffix}" + + def persist(self, article: Mapping[str, Any]) -> None: + payload = json.dumps(article, ensure_ascii=False) + with self._lock: + with self._file_path.open("a", encoding=self.encoding) as handle: + handle.write(payload) + handle.write("\n")