[crawler]: add runing instructions

This commit is contained in:
2025-10-06 20:20:14 +02:00
parent e105ff233f
commit 327ba5179d
12 changed files with 189 additions and 8 deletions
+39 -4
View File
@@ -1,11 +1,46 @@
# Crawler
[![Lint](https://github.com/bernard-ng/basango/actions/workflows/lint.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/lint.yml)
[![Lint](https://github.com/bernard-ng/basango/actions/workflows/test.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/test.yml)
[![Security](https://github.com/bernard-ng/basango/actions/workflows/security.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/security.yml)
[![crawler audit](https://github.com/bernard-ng/basango/actions/workflows/crawler_audit.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/crawler_audit.yml)
[![crawler quality](https://github.com/bernard-ng/basango/actions/workflows/crawler_quality.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/crawler_quality.yml)
[![crawler tests](https://github.com/bernard-ng/basango/actions/workflows/crawler_tests.yml/badge.svg)](https://github.com/bernard-ng/basango/actions/workflows/crawler_tests.yml)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
[![security: bandit](https://img.shields.io/badge/security-bandit-yellow.svg)](https://github.com/PyCQA/bandit)
---
### Get started
### Usage
- Install the project in your virtualenv so the `basango` CLI is available:
- With uv: `uv run --with . basango --help`
- Or install locally: `pip install -e .` then `basango --help`
#### Sync crawl (in-process)
- Crawl a configured source by id and write to CSV/JSON:
- `basango crawl --source-id my-source`
- Limit by page range: `basango crawl --source-id my-source -p 1:3`
- Limit by date range: `basango crawl --source-id my-source -d 2024-10-01:2024-10-31`
- Category, when supported: `basango crawl --source-id my-source -g tech`
#### Async crawl (Redis + RQ)
- Enqueue a crawl job and return immediately:
- `basango crawl --source-id my-source --async`
- Start one or more workers to process queues:
- Article-only (default): `basango worker`
- Multiple queues: `basango worker -q listing -q articles -q processed`
- macOS friendly (no forking): `basango worker --simple`
- One-shot draining for CI: `basango worker --burst`
#### Environment
- `BASANGO_REDIS_URL` (default `redis://localhost:6379/0`)
- `BASANGO_QUEUE_PREFIX` (default `crawler`)
- `BASANGO_QUEUE_TIMEOUT` (default `600` seconds)
- `BASANGO_QUEUE_RESULT_TTL` (default `3600` seconds)
- `BASANGO_QUEUE_FAILURE_TTL` (default `3600` seconds)
#### Configuration
- See `config/pipeline.*.yaml` for source definitions and HTTP client settings.
- Use `-c/--env` to select which pipeline to load (default `development`).
+38 -2
View File
@@ -1,3 +1,23 @@
"""
CLI entry points for crawling and worker management.
Sync vs async usage
- Synchronous crawl: runs the selected crawler in-process and writes results
via configured persistors (CSV/JSON). Suitable for local development or
small runs.
- Asynchronous crawl: enqueues a listing job in Redis (RQ) and returns
immediately. One or more RQ workers must be running to process jobs.
Examples
- Sync: `basango crawl --source-id my-source --page 1:3`
- Async: `basango crawl --source-id my-source --async`
- Worker (macOS friendly): `basango worker --simple -q articles`
Environment
- `BASANGO_REDIS_URL` points the worker/queues to Redis.
- `BASANGO_QUEUE_PREFIX` namespaces queues (default: `crawler`).
"""
from typing import List, Optional
from enum import Enum
@@ -42,7 +62,15 @@ def crawl_cmd(
help="Schedule crawl through Redis queues instead of running synchronously.",
),
) -> None:
"""Crawl a single source, either synchronously or via the async queue."""
"""Crawl a single source, either synchronously or via the async queue.
Technical notes
- When `--async` is set, we only enqueue a job (no crawling happens here).
This keeps the CLI responsive and leaves fault-tolerance to RQ workers.
- Persistors (CSV/JSON) are instantiated only for the sync path; the async
path assigns them inside worker tasks to avoid importing heavy deps in the
CLI process and to better isolate failures.
"""
manager = ConfigManager()
pipeline = manager.get(env)
manager.ensure_directories(pipeline)
@@ -138,7 +166,15 @@ def worker_cmd(
help="Environment used to configure logging before starting the worker.",
),
) -> None:
"""Run an RQ worker that consumes crawler queues."""
"""Run an RQ worker that consumes crawler queues.
Notes
- By default the worker listens to the `articles` queue (detail jobs). Use
`-q listing -q articles -q processed` to listen to multiple.
- `--simple` uses RQ's SimpleWorker (no forking). On macOS this avoids
fork-related crashes when libraries aren't fork-safe.
- Use `--burst` to drain the queue and exit, useful for CI or one-off runs.
"""
manager = ConfigManager()
pipeline = manager.get(env)
manager.ensure_directories(pipeline)
@@ -1,3 +1,15 @@
"""
RQ queue configuration and helpers.
Design choices
- Queue names are prefixed (e.g. `crawler:articles`) so multiple environments
can share the same Redis. Configure via `BASANGO_QUEUE_PREFIX`.
- Job default timeouts and TTLs are centrally configured to avoid per-enqueue
tuning. Environment variables allow ops to adjust at runtime.
- Task callables are referenced by dotted string path when enqueuing to ensure
RQ workers can import them without importing this module and creating cycles.
"""
import os
from dataclasses import dataclass, field
from typing import Iterable
@@ -1,3 +1,12 @@
"""
Lightweight task payload schemas.
Notes
- Use dataclasses with `slots=True` for low overhead and predictable fields.
- `_coerce_kwargs` filters unknown keys so payloads are resilient to schema
changes when workers and producers are not updated in lockstep.
"""
from dataclasses import asdict, dataclass, fields
from typing import Any, Mapping
@@ -1,3 +1,18 @@
"""
RQ task functions for the asynchronous crawl pipeline.
Pipeline
- schedule_async_crawl: seeds a listing job for a source
- collect_listing: enumerates listing pages and enqueues detail jobs
- collect_article: extracts and persists article data, then forwards
- forward_for_processing: hands the record to downstream system (HTTP API)
Rationale
- Split listing vs article work to keep jobs small and retryable.
- Use ConfigManager to reconstruct the same pipeline/env in workers.
- Persist locally (CSV/JSON) before forwarding to decouple pipelines.
"""
import logging
from typing import Any
@@ -28,6 +43,7 @@ def schedule_async_crawl(
category: str | None = None,
settings: QueueSettings | None = None,
):
# Keep payload serialisable and minimal; workers reconstruct config objects.
payload = ListingTaskPayload(
source_id=source_id,
env=env,
@@ -61,6 +77,8 @@ def collect_listing(payload: dict[str, Any]) -> int:
client_config = pipeline.fetch.client
queue_manager = QueueManager()
# Branch by source kind to reuse the same high-level flow with different
# extraction strategies.
if source.source_kind == SourceKind.HTML:
crawler = HtmlCrawler(crawler_config, client_config)
queued = _collect_html_listing(crawler, data, queue_manager)
@@ -95,6 +113,8 @@ def collect_article(payload: dict[str, Any]) -> dict[str, Any] | None:
)
source_identifier = getattr(source, "source_id", data.source_id) or data.source_id
# Persist locally first to keep an auditable trail and enable
# replay/recovery independent of downstream availability.
persistors = [
CsvPersistor(
data_dir=pipeline.paths.data,
@@ -161,6 +181,7 @@ def forward_for_processing(payload: dict[str, Any]) -> dict[str, Any] | None:
article.get("link"),
)
# TODO: externalise endpoint into config; hardcoded for now during bring-up.
persistor = ApiPersistor(
endpoint="http://localhost:8000/api/articles",
client_config=pipeline.fetch.client,
@@ -1,3 +1,11 @@
"""
Worker bootstrap for RQ queues.
Defaults to the `articles` queue to prioritise article detail processing.
`SimpleWorker` is exposed for environments where forking is unstable (e.g.,
some macOS setups). Use `burst=True` for CI or one-shot consumption.
"""
import logging
from typing import Sequence
@@ -1,3 +1,12 @@
"""
Thin indirection layer around async components (queues, tasks, worker).
We import symbols dynamically to avoid importing optional runtime dependencies
like RQ and Redis at module import time. This keeps regular (sync) crawling
usable even if async deps aren't installed, and avoids circular imports when
RQ workers import task callables by string path.
"""
from importlib import import_module
_async_queue = import_module("basango.services.crawler.async.queue")
@@ -12,6 +12,16 @@ from basango.services import HttpClient, DateParser, OpenGraphProvider, BasePers
class BaseCrawler(ABC):
"""
Base building blocks shared by concrete crawlers.
Notable conventions
- `skip`: raises `ArticleOutOfRange` when an item falls outside the desired
date range. Callers catch it to stop pagination early.
- `record_article`: normalises metadata (including dataclasses) before
handing off to persistors.
"""
def __init__(
self,
crawler_config: CrawlerConfig,
@@ -95,6 +105,8 @@ class BaseCrawler(ABC):
@classmethod
def skip(cls, date_range: DateRange, timestamp: str, title: str, date: str) -> None:
if date_range.out_range(int(timestamp)):
# Use an exception to unwind to the crawl loop and stop as soon as
# we detect items beyond the configured range.
raise ArticleOutOfRange.create(timestamp, date_range)
logging.warning(f"> {title} [Skipped {date}]")
@@ -15,6 +15,17 @@ from basango.services import BasePersistor
class HtmlCrawler(BaseCrawler):
"""
Generic HTML crawler driven by CSS selectors.
Strategy
- Listing pages are iterated to extract per-article links or blocks.
- When `requires_details` is set, a second request fetches the article page
to extract full content; otherwise the article block is parsed inline.
- Pagination is inferred from a template and last-page discovery heuristics
(regex or query string `page` fallback).
"""
def __init__(
self,
crawler_config: CrawlerConfig,
@@ -85,6 +96,8 @@ class HtmlCrawler(BaseCrawler):
self.fetch_one(target_html, date_range)
except ArticleOutOfRange:
# Using an exception to short-circuit nested loops keeps the
# happy path tidy (no extra flags at each extraction site).
logging.info("No more articles to fetch in this range.")
stop = True
break
@@ -161,7 +174,9 @@ class HtmlCrawler(BaseCrawler):
if not href or not isinstance(href, str):
return 1
# Extract number from href using regex or url parsing
# Heuristic: last pagination link either contains the page number
# directly or as a `page` query param. Prefer regex first to support
# path-style pagination (e.g., /page/4/).
match = re.search(r"(\d+)", href)
if match:
return int(match.group(1))
@@ -207,6 +222,8 @@ class HtmlCrawler(BaseCrawler):
if not target:
return None
# Support a few common attributes for link-like elements (href,
# data-href, src) to tolerate variations in markup without custom code.
raw_href = target.get("href") or target.get("data-href") or target.get("src")
href: Optional[str]
if isinstance(raw_href, str):
@@ -270,6 +287,8 @@ class HtmlCrawler(BaseCrawler):
if item.get_text(strip=True)
]
if parts:
# Join without separators: callers can post-process if
# needed, but this preserves maximum fidelity.
return "".join(parts)
return node.get_text(" ", strip=True)
@@ -13,6 +13,15 @@ from basango.services import BasePersistor
class WordpressCrawler(BaseCrawler):
"""
WordPress REST API crawler.
It uses the `/wp-json/wp/v2/posts` endpoints and limits fields to reduce
payload size. Pagination is driven by WordPress headers `x-wp-totalpages`
and `x-wp-total`. Category IDs are mapped to slugs via a secondary endpoint
and cached per run.
"""
def __init__(
self,
crawler_config: CrawlerConfig,
@@ -58,6 +67,7 @@ class WordpressCrawler(BaseCrawler):
try:
self.fetch_one(article, date_range)
except ArticleOutOfRange:
# Same early-exit semantic as HtmlCrawler
logging.info("No more articles to fetch in this range.")
stop = True
break
@@ -18,6 +18,12 @@ class ApiPersistor(BasePersistor):
self.client = SyncHttpClient(client_config=self.client_config)
def persist(self, article: Mapping[str, Any]) -> None:
"""POST the article payload to an HTTP endpoint.
By default `raise_for_status` is enabled so worker failures surface
quickly. Disable when using a fire-and-forget pipeline and rely on
logs/monitoring instead.
"""
try:
response = self.client.post(self.endpoint, json=article)
if self.raise_for_status:
@@ -30,6 +30,8 @@ class CsvPersistor(BasePersistor):
_header_written: bool = field(default=False, init=False, repr=False)
def __post_init__(self) -> None:
# Pre-create output directory and detect existing header to avoid
# re-writing it across process restarts.
self.data_dir.mkdir(parents=True, exist_ok=True)
self._file_path = self.data_dir / f"{self.source_id}.csv"
if self._file_path.exists() and self._file_path.stat().st_size > 0:
@@ -37,6 +39,8 @@ class CsvPersistor(BasePersistor):
def persist(self, article: Mapping[str, Any]) -> None:
record = self._serialise(article)
# File writes are guarded by a process-local lock to tolerate threads
# sharing the same persistor instance.
with self._lock:
needs_header = not self._header_written or not self._file_path.exists()
with self._file_path.open(
@@ -64,7 +68,7 @@ class CsvPersistor(BasePersistor):
if metadata is None or isinstance(metadata, str):
serialised_metadata = metadata
else:
# JSON-encode metadata to a string that is CSV-safe; csv module will quote it
# JSON-encode metadata to a compact, CSV-safe string; csv will quote it.
serialised_metadata = json.dumps(
metadata, ensure_ascii=True, separators=(",", ":"), sort_keys=True
)