[backend] accept articles from crawler

This commit is contained in:
2025-10-07 00:15:38 +02:00
parent 327ba5179d
commit 449718cdf2
397 changed files with 1490 additions and 3063 deletions
+7 -2
View File
@@ -1,2 +1,7 @@
BASANGO_CRAWLER_TOKEN=some-token
BASANGO_API_ENDPOINT=http://localhost:8000/api/aggregator/articles?token=dev
BASANGO_REDIS_URL=redis://localhost:6379/0
BASANGO_QUEUE_PREFIX=basango
BASANGO_QUEUE_TIMEOUT=30
BASANGO_QUEUE_RESULT_TTL=3600
BASANGO_QUEUE_FAILURE_TTL=86400
+1 -5
View File
@@ -26,7 +26,7 @@ import typer
from basango.core.config import CrawlerConfig
from basango.core.config_manager import ConfigManager
from basango.domain import DateRange, PageRange, UpdateDirection
from basango.services import CsvPersistor, JsonPersistor
from basango.services import JsonPersistor
from basango.services.crawler.async_api import (
QueueSettings,
schedule_async_crawl,
@@ -109,10 +109,6 @@ def crawl_cmd(
source_identifier = getattr(source, "source_id", source_id) or source_id
persistors = [
CsvPersistor(
data_dir=pipeline.paths.data,
source_id=str(source_identifier),
),
JsonPersistor(
data_dir=pipeline.paths.data,
source_id=str(source_identifier),
@@ -12,3 +12,14 @@ class Article(BaseModel):
source: str
timestamp: datetime
metadata: Optional[dict[str, Any]] = None
def to_dict(self) -> dict[str, Any]:
return {
"title": self.title,
"link": str(self.link),
"body": self.body,
"categories": self.categories,
"source": self.source,
"timestamp": int(self.timestamp.timestamp()),
"metadata": self.metadata,
}
@@ -1,7 +1,7 @@
from .date_parser import DateParser
from .http_client import BaseHttpClient, SyncHttpClient, AsyncHttpClient
from .open_graph import OpenGraphProvider
from .persistence import BasePersistor, CsvPersistor, JsonPersistor, ApiPersistor
from .persistence import BasePersistor, CsvPersistor, JsonPersistor
from .user_agents import UserAgentProvider
HttpClient = SyncHttpClient
@@ -17,5 +17,4 @@ __all__ = [
"BasePersistor",
"CsvPersistor",
"JsonPersistor",
"ApiPersistor",
]
@@ -10,6 +10,8 @@ Notes
from dataclasses import asdict, dataclass, fields
from typing import Any, Mapping
from basango.domain.article import Article
def _coerce_kwargs(cls, data: Mapping[str, Any]) -> dict[str, Any]:
return {field.name: data.get(field.name) for field in fields(cls)}
@@ -51,8 +53,8 @@ class ArticleTaskPayload:
@dataclass(slots=True)
class ProcessedTaskPayload:
source_id: str
article: Article
env: str = "development"
article: Mapping[str, Any] | None = None
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@@ -13,13 +13,16 @@ Rationale
- Persist locally (CSV/JSON) before forwarding to decouple pipelines.
"""
import os
import logging
from typing import Any
from basango.domain.article import Article
from basango.services import SyncHttpClient
from basango.core.config import CrawlerConfig
from basango.core.config_manager import ConfigManager
from basango.domain import DateRange, PageRange, SourceKind, UpdateDirection
from basango.services import CsvPersistor, JsonPersistor, ApiPersistor
from basango.services import JsonPersistor
from basango.services.crawler.html_crawler import HtmlCrawler
from basango.services.crawler.wordpress_crawler import WordpressCrawler
@@ -95,7 +98,7 @@ def collect_listing(payload: dict[str, Any]) -> int:
return queued
def collect_article(payload: dict[str, Any]) -> dict[str, Any] | None:
def collect_article(payload: dict[str, Any]) -> Article | None:
data = ArticleTaskPayload.from_dict(payload)
manager = ConfigManager()
pipeline = manager.get(data.env)
@@ -112,82 +115,85 @@ def collect_article(payload: dict[str, Any]) -> dict[str, Any] | None:
direction=UpdateDirection.FORWARD,
)
source_identifier = getattr(source, "source_id", data.source_id) or data.source_id
# Persist locally first to keep an auditable trail and enable
# replay/recovery independent of downstream availability.
persistors = [
CsvPersistor(
data_dir=pipeline.paths.data,
source_id=str(source_identifier),
),
JsonPersistor(
data_dir=pipeline.paths.data,
source_id=str(source_identifier),
source_id=str(source.source_id),
),
]
queue_manager = QueueManager()
try:
if source.source_kind == SourceKind.HTML:
article = _collect_html_article(
HtmlCrawler(
crawler_config, pipeline.fetch.client, persistors=persistors
),
data,
)
else:
article = _collect_wordpress_article(
WordpressCrawler(
crawler_config, pipeline.fetch.client, persistors=persistors
),
data,
)
if source.source_kind == SourceKind.HTML:
article = _collect_html_article(
HtmlCrawler(crawler_config, pipeline.fetch.client, persistors=persistors),
data,
queue_manager = QueueManager()
queue_manager.enqueue_processed(
ProcessedTaskPayload(
source_id=data.source_id,
env=data.env,
article=article,
)
)
elif source.source_kind == SourceKind.WORDPRESS:
article = _collect_wordpress_article(
WordpressCrawler(
crawler_config, pipeline.fetch.client, persistors=persistors
),
data,
)
else:
logger.warning(
"Async crawling not supported for source kind %s", source.source_kind
)
article = None
queue_manager.enqueue_processed(
ProcessedTaskPayload(
source_id=data.source_id,
env=data.env,
article=article,
)
)
if article:
logger.info(
"Persisted article %s and forwarded to processed queue",
article.get("link"),
"Persisted article %s and forwarded to processed queue", article.link
)
else:
logger.info("Persisted article and forwarded to processed queue")
return article
return article
except Exception as exc: # noqa: BLE001
logger.error(
"Failed to collect article for source %s url %s: %s",
data.source_id,
data.url,
exc,
)
return None
def forward_for_processing(payload: dict[str, Any]) -> dict[str, Any] | None:
def forward_for_processing(payload: dict[str, Any]) -> Article | None:
data = ProcessedTaskPayload.from_dict(payload)
manager = ConfigManager()
pipeline = manager.get(data.env)
article = dict(data.article) if data.article is not None else None
if article is None:
logger.info(
"Ready for downstream processing: source=%s (no article)", data.source_id
)
return None
article = data.article
logger.info(
"Ready for downstream processing: source=%s link=%s",
data.source_id,
article.get("link"),
article.link,
)
# TODO: externalise endpoint into config; hardcoded for now during bring-up.
persistor = ApiPersistor(
endpoint="http://localhost:8000/api/articles",
client_config=pipeline.fetch.client,
)
persistor.persist(article)
logger.info("Forwarded article %s to API", article.get("link"))
try:
client = SyncHttpClient(client_config=pipeline.fetch.client)
client.post(
os.getenv(
"BASANGO_API_ENDPOINT",
"http://localhost:8000/api/aggregator/articles?token=dev",
),
json=article.to_dict(),
)
logger.info("Forwarded article %s to API", article.link)
return article
except Exception as exc: # noqa: BLE001
logger.error(
"Failed to forward article %s to API: %s",
article.link,
exc,
)
return None
def _collect_html_listing(
@@ -273,31 +279,27 @@ def _collect_wordpress_listing(
def _collect_html_article(
crawler: HtmlCrawler,
payload: ArticleTaskPayload,
) -> dict[str, Any] | None:
) -> Article:
if not payload.url:
logger.warning("Missing article url for HTML source %s", payload.source_id)
return None
raise ValueError("Missing article url")
crawler._current_article_url = payload.url # type: ignore[attr-defined]
try:
soup = crawler.crawl(payload.url)
except Exception as exc: # noqa: BLE001
logger.exception("Failed to crawl article %s: %s", payload.url, exc)
return None
raise exc
crawler.fetch_one(str(soup), crawler.config.date_range)
crawler.completed(False)
return None
return crawler.fetch_one(str(soup), crawler.config.date_range)
def _collect_wordpress_article(
crawler: WordpressCrawler,
payload: ArticleTaskPayload,
) -> dict[str, Any] | None:
) -> Article:
if payload.data is None:
logger.warning("Missing WordPress payload for source %s", payload.source_id)
return None
raise ValueError("Missing WordPress payload")
crawler.fetch_one(payload.data, crawler.config.date_range)
crawler.completed(False)
return None
return crawler.fetch_one(payload.data, crawler.config.date_range)
@@ -3,6 +3,7 @@ from abc import ABC, abstractmethod
from dataclasses import asdict, is_dataclass
from typing import Optional, Any, Dict, List, Sequence
from basango.domain.article import Article
from bs4 import BeautifulSoup
from basango.core.config import CrawlerConfig, ClientConfig
@@ -46,7 +47,7 @@ class BaseCrawler(ABC):
response = self.client.get(url).text
return BeautifulSoup(response, "html.parser")
def record_article(
def save_article(
self,
*,
title: str,
@@ -55,7 +56,7 @@ class BaseCrawler(ABC):
categories: List[str],
timestamp: int,
metadata: Any = None,
) -> None:
) -> Article:
if metadata is None:
metadata_value = None
elif is_dataclass(metadata) and not isinstance(metadata, type):
@@ -72,11 +73,16 @@ class BaseCrawler(ABC):
"timestamp": timestamp,
"metadata": metadata_value,
}
self._persist(article)
logging.info(f"> {title} [saved]")
logging.info(f"> {article['title']} [saved]")
return Article(**article)
@abstractmethod
def fetch_one(self, html: str, date_range: Optional[DateRange] = None) -> None:
def fetch_one(
self, html: str, date_range: Optional[DateRange] = None
) -> Article | None:
pass
@abstractmethod
@@ -4,6 +4,7 @@ from datetime import datetime, timezone
from typing import Optional, cast, override, Sequence
from urllib.parse import parse_qs, urljoin, urlparse
from basango.domain.article import Article
from bs4 import BeautifulSoup, Tag
from basango.core.config import CrawlerConfig, ClientConfig
@@ -116,7 +117,7 @@ class HtmlCrawler(BaseCrawler):
self.completed(self.config.notify)
@override
def fetch_one(self, html: str, date_range: Optional[DateRange] = None) -> None:
def fetch_one(self, html: str, date_range: Optional[DateRange] = None) -> Article:
soup = BeautifulSoup(html, "html.parser")
selectors = self.source.source_selectors
@@ -124,7 +125,7 @@ class HtmlCrawler(BaseCrawler):
link = self._current_article_url or self._extract_link(soup)
if not link:
logging.warning("Skipping article '%s' without link", title)
return
raise ValueError("Missing article link")
body = self._extract_body(soup, selectors.article_body)
categories = self._extract_categories(soup, selectors.article_categories)
@@ -139,7 +140,7 @@ class HtmlCrawler(BaseCrawler):
metadata = self.open_graph.consume_html(html)
self.record_article(
return self.save_article(
title=title,
link=link,
body=body,
@@ -3,6 +3,7 @@ import logging
from datetime import datetime, timezone
from typing import Optional, override, cast, Final, Any, Sequence
from basango.domain.article import Article
from bs4 import BeautifulSoup
from basango.core.config import WordPressSourceConfig, CrawlerConfig, ClientConfig
@@ -83,28 +84,27 @@ class WordpressCrawler(BaseCrawler):
self.completed(self.config.notify)
@override
def fetch_one(self, html: Any, date_range: Optional[DateRange] = None) -> None:
def fetch_one(self, html: Any, date_range: Optional[DateRange] = None) -> Article:
try:
data = json.loads(html) if isinstance(html, str) else html
except json.JSONDecodeError as exc:
logging.error("Failed to decode WordPress payload: %s", exc)
return
raise exc
if not isinstance(data, dict):
logging.error("Skipping unexpected WordPress payload: %s", type(data))
return
raise ValueError("Unexpected WordPress payload type")
link = data.get("link")
if not link:
logging.error("Skipping WordPress article without link")
return
raise ValueError("WordPress article without link")
title = BeautifulSoup(
data.get("title", {}).get("rendered", ""), "html.parser"
).get_text(" ", strip=True)
body = BeautifulSoup(
data.get("content", {}).get("rendered", ""), "html.parser"
).get_text(" ", strip=True)
title_html = data.get("title", {}).get("rendered", "")
body_html = data.get("content", {}).get("rendered", "")
title = BeautifulSoup(title_html, "html.parser").get_text(" ", strip=True)
body = BeautifulSoup(body_html, "html.parser").get_text(" ", strip=True)
timestamp = self._compute_timestamp(data.get("date"))
categories_value = self._map_categories(data.get("categories", []))
@@ -115,7 +115,7 @@ class WordpressCrawler(BaseCrawler):
metadata = self.open_graph.consume_url(link)
self.record_article(
return self.save_article(
title=title or data.get("slug", "Untitled"),
link=link,
body=body,
@@ -1,11 +1,9 @@
from .base_persistor import BasePersistor
from .csv_persistor import CsvPersistor
from .json_persistor import JsonPersistor
from .api_persistor import ApiPersistor
__all__ = [
"BasePersistor",
"CsvPersistor",
"JsonPersistor",
"ApiPersistor",
]
@@ -1,38 +0,0 @@
import logging
from dataclasses import dataclass
from typing import Any, Mapping
from basango.core.config import ClientConfig
from basango.services.http_client import SyncHttpClient
from .base_persistor import BasePersistor
@dataclass
class ApiPersistor(BasePersistor):
endpoint: str
client_config: ClientConfig
raise_for_status: bool = True
def __post_init__(self) -> None:
self.client = SyncHttpClient(client_config=self.client_config)
def persist(self, article: Mapping[str, Any]) -> None:
"""POST the article payload to an HTTP endpoint.
By default `raise_for_status` is enabled so worker failures surface
quickly. Disable when using a fire-and-forget pipeline and rely on
logs/monitoring instead.
"""
try:
response = self.client.post(self.endpoint, json=article)
if self.raise_for_status:
response.raise_for_status()
except Exception as exc: # noqa: BLE001
logging.exception(
"[ApiPersistor] Failed to persist article at %s: %s",
self.endpoint,
exc,
)
if self.raise_for_status:
raise