feat: crawling html and wordpress sources

This commit is contained in:
BernardNganduDev
2025-10-06 15:18:08 +02:00
parent 68d521677a
commit 039402110d
6 changed files with 398 additions and 25 deletions
+4 -3
View File
@@ -42,11 +42,12 @@ def crawl_cmd(
)
crawlers = [
HtmlCrawler(crawler_config, pipeline.fetch.client),
WordpressCrawler(crawler_config, pipeline.fetch.client),
HtmlCrawler,
WordpressCrawler,
]
for crawler in crawlers:
if crawler.supports(source.source_kind):
if crawler.supports() == source.source_kind:
crawler = crawler(crawler_config, pipeline.fetch.client)
crawler.fetch()
break
@@ -1,4 +1,5 @@
import logging
import sys
from pathlib import Path
from typing import Optional, Union, Dict
@@ -8,6 +9,15 @@ from basango.core.config import PipelineConfig
from basango.core.project_paths import ProjectPaths
def _ensure_utf8_stream(stream):
try:
if hasattr(stream, "reconfigure"):
stream.reconfigure(encoding="utf-8", errors="replace")
except (AttributeError, ValueError):
return stream
return stream
class ConfigManager:
def __init__(self, config_path: Optional[Union[str, Path]] = None):
self.config_path = Path(config_path) if config_path else self._find_config()
@@ -78,9 +88,13 @@ class ConfigManager:
# Clear existing handlers
root_logger.handlers.clear()
_ensure_utf8_stream(sys.stdout)
_ensure_utf8_stream(sys.stderr)
# Console handler
if cfg.logging.console_logging:
console_handler = logging.StreamHandler()
console_handler = logging.StreamHandler(
stream=_ensure_utf8_stream(sys.stderr)
)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
@@ -93,6 +107,7 @@ class ConfigManager:
log_file_path,
maxBytes=cfg.logging.max_log_size,
backupCount=cfg.logging.backup_count,
encoding="utf-8",
)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
@@ -10,7 +10,7 @@ class SourceKind(StrEnum):
class SourceDate(BaseModel):
format: str = "%Y-%m-%dT%H:%M:%S"
format: str = "%Y-%m-%d %H:%M"
pattern: Optional[str] = None
replacement: Optional[str] = None
@@ -1,13 +1,14 @@
import logging
from abc import ABC, abstractmethod
from typing import Optional
from dataclasses import asdict, is_dataclass
from typing import Optional, Any, Dict, List
from bs4 import BeautifulSoup
from basango.core.config import CrawlerConfig, ClientConfig
from basango.domain import DateRange, SourceKind, PageRange
from basango.domain.exception import ArticleOutOfRange
from basango.services import HttpClient
from basango.services import HttpClient, DateParser, OpenGraphProvider
class BaseCrawler(ABC):
@@ -17,6 +18,9 @@ class BaseCrawler(ABC):
self.config = crawler_config
self.source = crawler_config.source
self.client = HttpClient(client_config=client_config)
self.results: List[Dict[str, Any]] = []
self.date_parser = DateParser()
self.open_graph = OpenGraphProvider()
@abstractmethod
def fetch(self) -> None:
@@ -29,6 +33,34 @@ class BaseCrawler(ABC):
response = self.client.get(url).text
return BeautifulSoup(response, "html.parser")
def record_article(
self,
*,
title: str,
link: str,
body: str,
categories: List[str],
timestamp: int,
metadata: Any = None,
) -> None:
if metadata is None:
metadata_value = None
elif is_dataclass(metadata) and not isinstance(metadata, type):
metadata_value = asdict(metadata)
else:
metadata_value = metadata
article = {
"title": title,
"link": link,
"body": body,
"categories": categories,
"source": getattr(self.source, "source_id", None),
"timestamp": timestamp,
"metadata": metadata_value,
}
self.results.append(article)
logging.info(f"> {title} [saved]")
@abstractmethod
def fetch_one(self, html: str, date_range: Optional[DateRange] = None) -> None:
pass
@@ -40,8 +72,9 @@ class BaseCrawler(ABC):
def get_last_page(self) -> int:
return 1
@staticmethod
@abstractmethod
def supports(self, source_kind: SourceKind) -> bool:
def supports() -> SourceKind:
pass
@classmethod
@@ -1,10 +1,15 @@
import logging
import re
from datetime import datetime, timezone
from typing import Optional, cast, override
from urllib.parse import urlparse, parse_qs
from urllib.parse import parse_qs, urljoin, urlparse
from bs4 import BeautifulSoup, Tag
from basango.core.config import CrawlerConfig, ClientConfig
from basango.core.config.source_config import HtmlSourceConfig
from basango.domain import PageRange, SourceKind, DateRange
from basango.domain import DateRange, PageRange, SourceKind
from basango.domain.exception import ArticleOutOfRange
from basango.services.crawler.base_crawler import BaseCrawler
@@ -17,16 +22,114 @@ class HtmlCrawler(BaseCrawler):
raise ValueError("HtmlCrawler requires a source of kind HTML")
self.source = cast(HtmlSourceConfig, self.source)
self._current_article_url: Optional[str] = None
@override
def fetch(self) -> None:
self.initialize()
page = self.config.page_range or self.get_pagination()
print(page)
page_range = self.config.page_range or self.get_pagination()
date_range = self.config.date_range
article_selector = self.source.source_selectors.articles
if not article_selector:
logging.error(
"No article selector configured for HTML source %s",
self.source.source_id,
)
return
stop = False
for page_number in range(page_range.start, page_range.end + 1):
page_url = self._build_page_url(page_number)
try:
soup = self.crawl(page_url, page_number)
except Exception as exc: # noqa: BLE001
logging.error(
"> page %s => %s [failed]",
page_number,
exc,
)
continue
articles = soup.select(article_selector)
if not articles:
logging.info("No articles found on page %s", page_number)
continue
for article in articles:
try:
self._current_article_url = self._extract_link(article)
target_html = str(article)
if self.source.requires_details:
if not self._current_article_url:
logging.debug(
"Skipping article without link for details on page %s",
page_number,
)
continue
try:
detail_soup = self.crawl(self._current_article_url)
target_html = str(detail_soup)
except Exception as detail_exc: # noqa: BLE001
logging.error(
"Failed to fetch detail page %s: %s",
self._current_article_url,
detail_exc,
)
continue
self.fetch_one(target_html, date_range)
except ArticleOutOfRange:
logging.info("No more articles to fetch in this range.")
stop = True
break
except Exception as exc: # noqa: BLE001
logging.error(
"Failed to process article on %s: %s",
page_url,
exc,
)
finally:
self._current_article_url = None
if stop:
break
self.completed(self.config.notify)
@override
def fetch_one(self, html: str, date_range: Optional[DateRange] = None) -> None:
pass
soup = BeautifulSoup(html, "html.parser")
selectors = self.source.source_selectors
title = self._extract_text(soup, selectors.article_title) or "Untitled"
link = self._current_article_url or self._extract_link(soup)
if not link:
logging.warning("Skipping article '%s' without link", title)
return
body = self._extract_body(soup, selectors.article_body)
categories = self._extract_categories(soup, selectors.article_categories)
if not categories and self.config.category:
categories = [self.config.category]
raw_date = self._extract_text(soup, selectors.article_date)
timestamp = self._compute_timestamp(raw_date)
if date_range and not date_range.in_range(timestamp):
self.skip(date_range, str(timestamp), title, raw_date or "")
metadata = self.open_graph.consume_html(html)
self.record_article(
title=title,
link=link,
body=body,
categories=categories,
timestamp=timestamp,
metadata=metadata,
)
@override
def get_pagination(self) -> PageRange:
@@ -67,6 +170,128 @@ class HtmlCrawler(BaseCrawler):
return 1
return 1
@staticmethod
@override
def supports(self, source_kind: SourceKind) -> bool:
return source_kind == SourceKind.HTML
def supports() -> SourceKind:
return SourceKind.HTML
def _build_page_url(self, page: int) -> str:
template = self._apply_category(self.source.pagination_template)
if "{page}" in template:
template = template.format(page=page)
elif page > 0:
separator = "&" if "?" in template else "?"
template = f"{template}{separator}page={page}"
base = str(self.source.source_url)
if not base.endswith("/"):
base = f"{base}/"
return urljoin(base, template.lstrip("/"))
def _apply_category(self, template: str) -> str:
if "{category}" in template:
replacement = self.config.category or ""
return template.replace("{category}", replacement)
return template
def _extract_link(self, node: BeautifulSoup | Tag) -> Optional[str]:
selector = self.source.source_selectors.article_link
if not selector:
return None
target = node.select_one(selector)
if not target:
return None
raw_href = target.get("href") or target.get("data-href") or target.get("src")
href: Optional[str]
if isinstance(raw_href, str):
href = raw_href.strip() or None
elif isinstance(raw_href, list):
href = next(
(
item.strip()
for item in raw_href
if isinstance(item, str) and item.strip()
),
None,
)
else:
href = None
if not href:
return None
return self._to_absolute_url(href)
def _to_absolute_url(self, href: str) -> str:
base = str(self.source.source_url)
if not base.endswith("/"):
base = f"{base}/"
return urljoin(base, href)
@staticmethod
def _extract_text(
node: BeautifulSoup | Tag, selector: Optional[str]
) -> Optional[str]:
if not selector:
return None
target = node.select_one(selector)
if not target:
return None
if target.name == "img":
for attr in ("alt", "title"):
value = target.get(attr)
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
elif isinstance(value, list):
for item in value:
if isinstance(item, str):
stripped = item.strip()
if stripped:
return stripped
text = target.get_text(" ", strip=True)
return text or None
@staticmethod
def _extract_body(node: BeautifulSoup | Tag, selector: Optional[str]) -> str:
if selector:
matches = node.select(selector)
if matches:
parts = [
item.get_text(" ", strip=True)
for item in matches
if item.get_text(strip=True)
]
if parts:
return "".join(parts)
return node.get_text(" ", strip=True)
@staticmethod
def _extract_categories(
node: BeautifulSoup | Tag, selector: Optional[str]
) -> list[str]:
if not selector:
return []
values: list[str] = []
for item in node.select(selector):
text = item.get_text(" ", strip=True)
if text:
lower = text.lower()
if lower not in values:
values.append(lower)
return values
def _compute_timestamp(self, raw_date: Optional[str]) -> int:
if not raw_date:
return int(datetime.now(timezone.utc).timestamp())
return self.date_parser.create_timestamp(
raw_date.strip(),
fmt=self.source.source_date.format,
pattern=self.source.source_date.pattern,
replacement=self.source.source_date.replacement,
)
@@ -1,8 +1,13 @@
import json
import logging
from typing import Optional, override, cast, Final
from datetime import datetime, timezone
from typing import Optional, override, cast, Final, Any
from bs4 import BeautifulSoup
from basango.core.config import WordPressSourceConfig, CrawlerConfig, ClientConfig
from basango.domain import SourceKind, PageRange, DateRange
from basango.domain.exception import ArticleOutOfRange
from basango.services.crawler.base_crawler import BaseCrawler
@@ -15,6 +20,7 @@ class WordpressCrawler(BaseCrawler):
raise ValueError("WordpressCrawler requires a source of kind WORDPRESS")
self.source = cast(WordPressSourceConfig, self.source)
self.category_map: dict[int, str] = {}
POST_QUERY: Final = "_fields=date,slug,link,title.rendered,content.rendered,categories&orderby=date&order=desc"
CATEGORY_QUERY: Final = (
@@ -23,17 +29,86 @@ class WordpressCrawler(BaseCrawler):
TOTAL_PAGES_HEADER: Final = "x-wp-totalpages"
TOTAL_POSTS_HEADER: Final = "x-wp-total"
category_map: dict[int, str] = {}
@override
def fetch(self) -> None:
self.initialize()
page = self.config.page_range or self.get_pagination()
print(page)
page_range = self.config.page_range or self.get_pagination()
date_range = self.config.date_range
stop = False
for page_number in range(page_range.start, page_range.end + 1):
endpoint = self._posts_endpoint(page_number)
try:
response = self.client.get(endpoint)
payload = response.text
articles = json.loads(payload)
except Exception as exc: # noqa: BLE001
logging.error(
"> page %s => %s [failed]",
page_number,
exc,
)
continue
for article in articles:
try:
self.fetch_one(article, date_range)
except ArticleOutOfRange:
logging.info("No more articles to fetch in this range.")
stop = True
break
except Exception as exc: # noqa: BLE001
logging.error(
"Failed to process WordPress article on page %s: %s",
page_number,
exc,
)
if stop:
break
self.completed(self.config.notify)
@override
def fetch_one(self, html: str, date_range: Optional[DateRange] = None) -> None:
pass
def fetch_one(self, html: Any, date_range: Optional[DateRange] = None) -> None:
try:
data = json.loads(html) if isinstance(html, str) else html
except json.JSONDecodeError as exc:
logging.error("Failed to decode WordPress payload: %s", exc)
return
if not isinstance(data, dict):
logging.error("Skipping unexpected WordPress payload: %s", type(data))
return
link = data.get("link")
if not link:
logging.error("Skipping WordPress article without link")
return
title = BeautifulSoup(
data.get("title", {}).get("rendered", ""), "html.parser"
).get_text(" ", strip=True)
body = BeautifulSoup(
data.get("content", {}).get("rendered", ""), "html.parser"
).get_text(" ", strip=True)
timestamp = self._compute_timestamp(data.get("date"))
categories_value = self._map_categories(data.get("categories", []))
categories = [item for item in categories_value.split(",") if item]
if date_range and not date_range.in_range(timestamp):
self.skip(date_range, str(timestamp), title, data.get("date", ""))
metadata = self.open_graph.consume_url(link)
self.record_article(
title=title or data.get("slug", "Untitled"),
link=link,
body=body,
categories=categories,
timestamp=timestamp,
metadata=metadata,
)
@override
def get_pagination(self) -> PageRange:
@@ -43,7 +118,7 @@ class WordpressCrawler(BaseCrawler):
pages = int(response.headers.get(self.TOTAL_PAGES_HEADER, "1"))
posts = int(response.headers.get(self.TOTAL_POSTS_HEADER, "0"))
logging.info(f"WordPress Pagination {posts} posts in {pages} pages")
logging.info("WordPress Pagination %s posts in %s pages", posts, pages)
return PageRange.create(f"1:{pages}")
def _fetch_categories(self) -> None:
@@ -62,10 +137,34 @@ class WordpressCrawler(BaseCrawler):
if category in self.category_map
)
def _posts_endpoint(self, page: int) -> str:
base = str(self.source.source_url)
if not base.endswith("/"):
base = f"{base}/"
return f"{base}wp-json/wp/v2/posts?{self.POST_QUERY}&page={page}&per_page=100"
@staticmethod
def _compute_timestamp(raw: Optional[str]) -> int:
if not raw:
return int(datetime.now(timezone.utc).timestamp())
cleaned = raw.replace("Z", "+00:00")
try:
dt = datetime.fromisoformat(cleaned)
except ValueError:
return int(datetime.now(timezone.utc).timestamp())
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return int(dt.timestamp())
@override
def get_last_page(self) -> int:
return 1
@staticmethod
@staticmethod
@override
def supports(self, source_kind: SourceKind) -> bool:
return source_kind == SourceKind.WORDPRESS
def supports() -> SourceKind:
return SourceKind.WORDPRESS