From 327ba5179ddbdc63624e0a3292a4cf799a294c55 Mon Sep 17 00:00:00 2001 From: bernard-ng Date: Mon, 6 Oct 2025 20:20:14 +0200 Subject: [PATCH] [crawler]: add runing instructions --- projects/crawler/README.md | 43 +++++++++++++++++-- projects/crawler/src/basango/cli.py | 40 ++++++++++++++++- .../basango/services/crawler/async/queue.py | 12 ++++++ .../basango/services/crawler/async/schemas.py | 9 ++++ .../basango/services/crawler/async/tasks.py | 21 +++++++++ .../basango/services/crawler/async/worker.py | 8 ++++ .../src/basango/services/crawler/async_api.py | 9 ++++ .../basango/services/crawler/base_crawler.py | 12 ++++++ .../basango/services/crawler/html_crawler.py | 21 ++++++++- .../services/crawler/wordpress_crawler.py | 10 +++++ .../services/persistence/api_persistor.py | 6 +++ .../services/persistence/csv_persistor.py | 6 ++- 12 files changed, 189 insertions(+), 8 deletions(-) diff --git a/projects/crawler/README.md b/projects/crawler/README.md index bc4cfb9..9af3938 100644 --- a/projects/crawler/README.md +++ b/projects/crawler/README.md @@ -1,11 +1,46 @@ # Crawler -[![Lint](https://github.com/bernard-ng/basango/actions/workflows/lint.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/lint.yml) -[![Lint](https://github.com/bernard-ng/basango/actions/workflows/test.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/test.yml) -[![Security](https://github.com/bernard-ng/basango/actions/workflows/security.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/security.yml) +[![crawler audit](https://github.com/bernard-ng/basango/actions/workflows/crawler_audit.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/crawler_audit.yml) +[![crawler quality](https://github.com/bernard-ng/basango/actions/workflows/crawler_quality.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/crawler_quality.yml) +[![crawler tests](https://github.com/bernard-ng/basango/actions/workflows/crawler_tests.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/crawler_tests.yml) [![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) [![security: bandit](https://img.shields.io/badge/security-bandit-yellow.svg)](https://github.com/PyCQA/bandit) --- -### Get started +### Usage + +- Install the project in your virtualenv so the `basango` CLI is available: + - With uv: `uv run --with . basango --help` + - Or install locally: `pip install -e .` then `basango --help` + +#### Sync crawl (in-process) + +- Crawl a configured source by id and write to CSV/JSON: + - `basango crawl --source-id my-source` + - Limit by page range: `basango crawl --source-id my-source -p 1:3` + - Limit by date range: `basango crawl --source-id my-source -d 2024-10-01:2024-10-31` + - Category, when supported: `basango crawl --source-id my-source -g tech` + +#### Async crawl (Redis + RQ) + +- Enqueue a crawl job and return immediately: + - `basango crawl --source-id my-source --async` +- Start one or more workers to process queues: + - Article-only (default): `basango worker` + - Multiple queues: `basango worker -q listing -q articles -q processed` + - macOS friendly (no forking): `basango worker --simple` + - One-shot draining for CI: `basango worker --burst` + +#### Environment + +- `BASANGO_REDIS_URL` (default `redis://localhost:6379/0`) +- `BASANGO_QUEUE_PREFIX` (default `crawler`) +- `BASANGO_QUEUE_TIMEOUT` (default `600` seconds) +- `BASANGO_QUEUE_RESULT_TTL` (default `3600` seconds) +- `BASANGO_QUEUE_FAILURE_TTL` (default `3600` seconds) + +#### Configuration + +- See `config/pipeline.*.yaml` for source definitions and HTTP client settings. +- Use `-c/--env` to select which pipeline to load (default `development`). diff --git a/projects/crawler/src/basango/cli.py b/projects/crawler/src/basango/cli.py index be401fe..5d18132 100644 --- a/projects/crawler/src/basango/cli.py +++ b/projects/crawler/src/basango/cli.py @@ -1,3 +1,23 @@ +""" +CLI entry points for crawling and worker management. + +Sync vs async usage +- Synchronous crawl: runs the selected crawler in-process and writes results + via configured persistors (CSV/JSON). Suitable for local development or + small runs. +- Asynchronous crawl: enqueues a listing job in Redis (RQ) and returns + immediately. One or more RQ workers must be running to process jobs. + +Examples +- Sync: `basango crawl --source-id my-source --page 1:3` +- Async: `basango crawl --source-id my-source --async` +- Worker (macOS friendly): `basango worker --simple -q articles` + +Environment +- `BASANGO_REDIS_URL` points the worker/queues to Redis. +- `BASANGO_QUEUE_PREFIX` namespaces queues (default: `crawler`). +""" + from typing import List, Optional from enum import Enum @@ -42,7 +62,15 @@ def crawl_cmd( help="Schedule crawl through Redis queues instead of running synchronously.", ), ) -> None: - """Crawl a single source, either synchronously or via the async queue.""" + """Crawl a single source, either synchronously or via the async queue. + + Technical notes + - When `--async` is set, we only enqueue a job (no crawling happens here). + This keeps the CLI responsive and leaves fault-tolerance to RQ workers. + - Persistors (CSV/JSON) are instantiated only for the sync path; the async + path assigns them inside worker tasks to avoid importing heavy deps in the + CLI process and to better isolate failures. + """ manager = ConfigManager() pipeline = manager.get(env) manager.ensure_directories(pipeline) @@ -138,7 +166,15 @@ def worker_cmd( help="Environment used to configure logging before starting the worker.", ), ) -> None: - """Run an RQ worker that consumes crawler queues.""" + """Run an RQ worker that consumes crawler queues. + + Notes + - By default the worker listens to the `articles` queue (detail jobs). Use + `-q listing -q articles -q processed` to listen to multiple. + - `--simple` uses RQ's SimpleWorker (no forking). On macOS this avoids + fork-related crashes when libraries aren't fork-safe. + - Use `--burst` to drain the queue and exit, useful for CI or one-off runs. + """ manager = ConfigManager() pipeline = manager.get(env) manager.ensure_directories(pipeline) diff --git a/projects/crawler/src/basango/services/crawler/async/queue.py b/projects/crawler/src/basango/services/crawler/async/queue.py index adb69a3..efd9084 100644 --- a/projects/crawler/src/basango/services/crawler/async/queue.py +++ b/projects/crawler/src/basango/services/crawler/async/queue.py @@ -1,3 +1,15 @@ +""" +RQ queue configuration and helpers. + +Design choices +- Queue names are prefixed (e.g. `crawler:articles`) so multiple environments + can share the same Redis. Configure via `BASANGO_QUEUE_PREFIX`. +- Job default timeouts and TTLs are centrally configured to avoid per-enqueue + tuning. Environment variables allow ops to adjust at runtime. +- Task callables are referenced by dotted string path when enqueuing to ensure + RQ workers can import them without importing this module and creating cycles. +""" + import os from dataclasses import dataclass, field from typing import Iterable diff --git a/projects/crawler/src/basango/services/crawler/async/schemas.py b/projects/crawler/src/basango/services/crawler/async/schemas.py index 2196a7b..43be121 100644 --- a/projects/crawler/src/basango/services/crawler/async/schemas.py +++ b/projects/crawler/src/basango/services/crawler/async/schemas.py @@ -1,3 +1,12 @@ +""" +Lightweight task payload schemas. + +Notes +- Use dataclasses with `slots=True` for low overhead and predictable fields. +- `_coerce_kwargs` filters unknown keys so payloads are resilient to schema + changes when workers and producers are not updated in lockstep. +""" + from dataclasses import asdict, dataclass, fields from typing import Any, Mapping diff --git a/projects/crawler/src/basango/services/crawler/async/tasks.py b/projects/crawler/src/basango/services/crawler/async/tasks.py index 9e20916..135a9e5 100644 --- a/projects/crawler/src/basango/services/crawler/async/tasks.py +++ b/projects/crawler/src/basango/services/crawler/async/tasks.py @@ -1,3 +1,18 @@ +""" +RQ task functions for the asynchronous crawl pipeline. + +Pipeline +- schedule_async_crawl: seeds a listing job for a source +- collect_listing: enumerates listing pages and enqueues detail jobs +- collect_article: extracts and persists article data, then forwards +- forward_for_processing: hands the record to downstream system (HTTP API) + +Rationale +- Split listing vs article work to keep jobs small and retryable. +- Use ConfigManager to reconstruct the same pipeline/env in workers. +- Persist locally (CSV/JSON) before forwarding to decouple pipelines. +""" + import logging from typing import Any @@ -28,6 +43,7 @@ def schedule_async_crawl( category: str | None = None, settings: QueueSettings | None = None, ): + # Keep payload serialisable and minimal; workers reconstruct config objects. payload = ListingTaskPayload( source_id=source_id, env=env, @@ -61,6 +77,8 @@ def collect_listing(payload: dict[str, Any]) -> int: client_config = pipeline.fetch.client queue_manager = QueueManager() + # Branch by source kind to reuse the same high-level flow with different + # extraction strategies. if source.source_kind == SourceKind.HTML: crawler = HtmlCrawler(crawler_config, client_config) queued = _collect_html_listing(crawler, data, queue_manager) @@ -95,6 +113,8 @@ def collect_article(payload: dict[str, Any]) -> dict[str, Any] | None: ) source_identifier = getattr(source, "source_id", data.source_id) or data.source_id + # Persist locally first to keep an auditable trail and enable + # replay/recovery independent of downstream availability. persistors = [ CsvPersistor( data_dir=pipeline.paths.data, @@ -161,6 +181,7 @@ def forward_for_processing(payload: dict[str, Any]) -> dict[str, Any] | None: article.get("link"), ) + # TODO: externalise endpoint into config; hardcoded for now during bring-up. persistor = ApiPersistor( endpoint="http://localhost:8000/api/articles", client_config=pipeline.fetch.client, diff --git a/projects/crawler/src/basango/services/crawler/async/worker.py b/projects/crawler/src/basango/services/crawler/async/worker.py index b9c15db..0903c9b 100644 --- a/projects/crawler/src/basango/services/crawler/async/worker.py +++ b/projects/crawler/src/basango/services/crawler/async/worker.py @@ -1,3 +1,11 @@ +""" +Worker bootstrap for RQ queues. + +Defaults to the `articles` queue to prioritise article detail processing. +`SimpleWorker` is exposed for environments where forking is unstable (e.g., +some macOS setups). Use `burst=True` for CI or one-shot consumption. +""" + import logging from typing import Sequence diff --git a/projects/crawler/src/basango/services/crawler/async_api.py b/projects/crawler/src/basango/services/crawler/async_api.py index 4406dc3..7b98ddc 100644 --- a/projects/crawler/src/basango/services/crawler/async_api.py +++ b/projects/crawler/src/basango/services/crawler/async_api.py @@ -1,3 +1,12 @@ +""" +Thin indirection layer around async components (queues, tasks, worker). + +We import symbols dynamically to avoid importing optional runtime dependencies +like RQ and Redis at module import time. This keeps regular (sync) crawling +usable even if async deps aren't installed, and avoids circular imports when +RQ workers import task callables by string path. +""" + from importlib import import_module _async_queue = import_module("basango.services.crawler.async.queue") diff --git a/projects/crawler/src/basango/services/crawler/base_crawler.py b/projects/crawler/src/basango/services/crawler/base_crawler.py index a489706..3f2c65b 100644 --- a/projects/crawler/src/basango/services/crawler/base_crawler.py +++ b/projects/crawler/src/basango/services/crawler/base_crawler.py @@ -12,6 +12,16 @@ from basango.services import HttpClient, DateParser, OpenGraphProvider, BasePers class BaseCrawler(ABC): + """ + Base building blocks shared by concrete crawlers. + + Notable conventions + - `skip`: raises `ArticleOutOfRange` when an item falls outside the desired + date range. Callers catch it to stop pagination early. + - `record_article`: normalises metadata (including dataclasses) before + handing off to persistors. + """ + def __init__( self, crawler_config: CrawlerConfig, @@ -95,6 +105,8 @@ class BaseCrawler(ABC): @classmethod def skip(cls, date_range: DateRange, timestamp: str, title: str, date: str) -> None: if date_range.out_range(int(timestamp)): + # Use an exception to unwind to the crawl loop and stop as soon as + # we detect items beyond the configured range. raise ArticleOutOfRange.create(timestamp, date_range) logging.warning(f"> {title} [Skipped {date}]") diff --git a/projects/crawler/src/basango/services/crawler/html_crawler.py b/projects/crawler/src/basango/services/crawler/html_crawler.py index 271757e..4d4837e 100644 --- a/projects/crawler/src/basango/services/crawler/html_crawler.py +++ b/projects/crawler/src/basango/services/crawler/html_crawler.py @@ -15,6 +15,17 @@ from basango.services import BasePersistor class HtmlCrawler(BaseCrawler): + """ + Generic HTML crawler driven by CSS selectors. + + Strategy + - Listing pages are iterated to extract per-article links or blocks. + - When `requires_details` is set, a second request fetches the article page + to extract full content; otherwise the article block is parsed inline. + - Pagination is inferred from a template and last-page discovery heuristics + (regex or query string `page` fallback). + """ + def __init__( self, crawler_config: CrawlerConfig, @@ -85,6 +96,8 @@ class HtmlCrawler(BaseCrawler): self.fetch_one(target_html, date_range) except ArticleOutOfRange: + # Using an exception to short-circuit nested loops keeps the + # happy path tidy (no extra flags at each extraction site). logging.info("No more articles to fetch in this range.") stop = True break @@ -161,7 +174,9 @@ class HtmlCrawler(BaseCrawler): if not href or not isinstance(href, str): return 1 - # Extract number from href using regex or url parsing + # Heuristic: last pagination link either contains the page number + # directly or as a `page` query param. Prefer regex first to support + # path-style pagination (e.g., /page/4/). match = re.search(r"(\d+)", href) if match: return int(match.group(1)) @@ -207,6 +222,8 @@ class HtmlCrawler(BaseCrawler): if not target: return None + # Support a few common attributes for link-like elements (href, + # data-href, src) to tolerate variations in markup without custom code. raw_href = target.get("href") or target.get("data-href") or target.get("src") href: Optional[str] if isinstance(raw_href, str): @@ -270,6 +287,8 @@ class HtmlCrawler(BaseCrawler): if item.get_text(strip=True) ] if parts: + # Join without separators: callers can post-process if + # needed, but this preserves maximum fidelity. return "".join(parts) return node.get_text(" ", strip=True) diff --git a/projects/crawler/src/basango/services/crawler/wordpress_crawler.py b/projects/crawler/src/basango/services/crawler/wordpress_crawler.py index ebe40c0..8de2920 100644 --- a/projects/crawler/src/basango/services/crawler/wordpress_crawler.py +++ b/projects/crawler/src/basango/services/crawler/wordpress_crawler.py @@ -13,6 +13,15 @@ from basango.services import BasePersistor class WordpressCrawler(BaseCrawler): + """ + WordPress REST API crawler. + + It uses the `/wp-json/wp/v2/posts` endpoints and limits fields to reduce + payload size. Pagination is driven by WordPress headers `x-wp-totalpages` + and `x-wp-total`. Category IDs are mapped to slugs via a secondary endpoint + and cached per run. + """ + def __init__( self, crawler_config: CrawlerConfig, @@ -58,6 +67,7 @@ class WordpressCrawler(BaseCrawler): try: self.fetch_one(article, date_range) except ArticleOutOfRange: + # Same early-exit semantic as HtmlCrawler logging.info("No more articles to fetch in this range.") stop = True break diff --git a/projects/crawler/src/basango/services/persistence/api_persistor.py b/projects/crawler/src/basango/services/persistence/api_persistor.py index 65b5e44..2326126 100644 --- a/projects/crawler/src/basango/services/persistence/api_persistor.py +++ b/projects/crawler/src/basango/services/persistence/api_persistor.py @@ -18,6 +18,12 @@ class ApiPersistor(BasePersistor): self.client = SyncHttpClient(client_config=self.client_config) def persist(self, article: Mapping[str, Any]) -> None: + """POST the article payload to an HTTP endpoint. + + By default `raise_for_status` is enabled so worker failures surface + quickly. Disable when using a fire-and-forget pipeline and rely on + logs/monitoring instead. + """ try: response = self.client.post(self.endpoint, json=article) if self.raise_for_status: diff --git a/projects/crawler/src/basango/services/persistence/csv_persistor.py b/projects/crawler/src/basango/services/persistence/csv_persistor.py index 0f8b2d2..36daaf4 100644 --- a/projects/crawler/src/basango/services/persistence/csv_persistor.py +++ b/projects/crawler/src/basango/services/persistence/csv_persistor.py @@ -30,6 +30,8 @@ class CsvPersistor(BasePersistor): _header_written: bool = field(default=False, init=False, repr=False) def __post_init__(self) -> None: + # Pre-create output directory and detect existing header to avoid + # re-writing it across process restarts. self.data_dir.mkdir(parents=True, exist_ok=True) self._file_path = self.data_dir / f"{self.source_id}.csv" if self._file_path.exists() and self._file_path.stat().st_size > 0: @@ -37,6 +39,8 @@ class CsvPersistor(BasePersistor): def persist(self, article: Mapping[str, Any]) -> None: record = self._serialise(article) + # File writes are guarded by a process-local lock to tolerate threads + # sharing the same persistor instance. with self._lock: needs_header = not self._header_written or not self._file_path.exists() with self._file_path.open( @@ -64,7 +68,7 @@ class CsvPersistor(BasePersistor): if metadata is None or isinstance(metadata, str): serialised_metadata = metadata else: - # JSON-encode metadata to a string that is CSV-safe; csv module will quote it + # JSON-encode metadata to a compact, CSV-safe string; csv will quote it. serialised_metadata = json.dumps( metadata, ensure_ascii=True, separators=(",", ":"), sort_keys=True )