From e105ff233f781e3006dd4095995acc7b0bad2376 Mon Sep 17 00:00:00 2001 From: bernard-ng Date: Mon, 6 Oct 2025 20:00:54 +0200 Subject: [PATCH] fix: async queue --- projects/crawler/.gitignore | 1 + projects/crawler/src/basango/cli.py | 41 +++++++++++++++---- .../basango/services/crawler/async/queue.py | 2 - .../basango/services/crawler/async/schemas.py | 2 - .../basango/services/crawler/async/tasks.py | 37 +++++++++++------ .../basango/services/crawler/async/worker.py | 14 ++++--- .../src/basango/services/crawler/async_api.py | 2 - .../services/http_client/async_http_client.py | 2 - .../services/http_client/base_http_client.py | 2 - .../services/http_client/sync_http_client.py | 2 - .../services/persistence/api_persistor.py | 15 +++---- .../services/persistence/base_persistor.py | 2 - .../services/persistence/csv_persistor.py | 14 +++++-- .../services/persistence/json_persistor.py | 2 - .../services/crawler/test_html_crawler.py | 5 +-- .../crawler/test_wordpress_crawler.py | 5 +-- 16 files changed, 88 insertions(+), 60 deletions(-) diff --git a/projects/crawler/.gitignore b/projects/crawler/.gitignore index 6bcf335..09649c3 100644 --- a/projects/crawler/.gitignore +++ b/projects/crawler/.gitignore @@ -9,6 +9,7 @@ var/ # Python-generated files __pycache__/ +.pytest_cache/ *.py[oc] build/ dist/ diff --git a/projects/crawler/src/basango/cli.py b/projects/crawler/src/basango/cli.py index 8746b73..be401fe 100644 --- a/projects/crawler/src/basango/cli.py +++ b/projects/crawler/src/basango/cli.py @@ -1,13 +1,12 @@ -from __future__ import annotations - from typing import List, Optional +from enum import Enum import typer from basango.core.config import CrawlerConfig from basango.core.config_manager import ConfigManager from basango.domain import DateRange, PageRange, UpdateDirection -from basango.services import CsvPersistor +from basango.services import CsvPersistor, JsonPersistor from basango.services.crawler.async_api import ( QueueSettings, schedule_async_crawl, @@ -19,6 +18,12 @@ from basango.services.crawler.wordpress_crawler import WordpressCrawler app = typer.Typer(no_args_is_help=True, add_completion=False) +class QueueName(str, Enum): + listing = "listing" + articles = "articles" + processed = "processed" + + @app.command("crawl") def crawl_cmd( source_id: str = typer.Option( @@ -79,7 +84,11 @@ def crawl_cmd( CsvPersistor( data_dir=pipeline.paths.data, source_id=str(source_identifier), - ) + ), + JsonPersistor( + data_dir=pipeline.paths.data, + source_id=str(source_identifier), + ), ] for crawler in crawlers: @@ -95,11 +104,22 @@ def crawl_cmd( @app.command("worker") def worker_cmd( - queue: Optional[List[str]] = typer.Option( + queue: Optional[List[QueueName]] = typer.Option( None, "--queue", "-q", - help="Queue name(s) (without prefix). Provide multiple times to listen to more than one queue.", + help=( + "Queue name(s) (without prefix). Choices: listing, articles, processed. " + "Provide multiple times to listen to more than one queue." + ), + ), + simple: bool = typer.Option( + False, + "--simple/--no-simple", + help=( + "Run jobs in-process using RQ SimpleWorker (no forking). " + "Recommended on macOS to avoid fork-related crashes." + ), ), burst: bool = typer.Option( False, @@ -125,5 +145,10 @@ def worker_cmd( manager.setup_logging(pipeline) settings = QueueSettings(redis_url=redis_url) if redis_url else QueueSettings() - queue_names = list(queue) if queue else None - start_worker(queue_names=queue_names, settings=settings, burst=burst) + queue_names = [q.value for q in queue] if queue else None + start_worker( + queue_names=queue_names, + settings=settings, + burst=burst, + simple=simple, + ) diff --git a/projects/crawler/src/basango/services/crawler/async/queue.py b/projects/crawler/src/basango/services/crawler/async/queue.py index ce627e0..adb69a3 100644 --- a/projects/crawler/src/basango/services/crawler/async/queue.py +++ b/projects/crawler/src/basango/services/crawler/async/queue.py @@ -1,5 +1,3 @@ -from __future__ import annotations - import os from dataclasses import dataclass, field from typing import Iterable diff --git a/projects/crawler/src/basango/services/crawler/async/schemas.py b/projects/crawler/src/basango/services/crawler/async/schemas.py index 420dc53..2196a7b 100644 --- a/projects/crawler/src/basango/services/crawler/async/schemas.py +++ b/projects/crawler/src/basango/services/crawler/async/schemas.py @@ -1,5 +1,3 @@ -from __future__ import annotations - from dataclasses import asdict, dataclass, fields from typing import Any, Mapping diff --git a/projects/crawler/src/basango/services/crawler/async/tasks.py b/projects/crawler/src/basango/services/crawler/async/tasks.py index 399376c..9e20916 100644 --- a/projects/crawler/src/basango/services/crawler/async/tasks.py +++ b/projects/crawler/src/basango/services/crawler/async/tasks.py @@ -1,12 +1,10 @@ -from __future__ import annotations - import logging from typing import Any from basango.core.config import CrawlerConfig from basango.core.config_manager import ConfigManager from basango.domain import DateRange, PageRange, SourceKind, UpdateDirection -from basango.services import CsvPersistor +from basango.services import CsvPersistor, JsonPersistor, ApiPersistor from basango.services.crawler.html_crawler import HtmlCrawler from basango.services.crawler.wordpress_crawler import WordpressCrawler @@ -101,7 +99,11 @@ def collect_article(payload: dict[str, Any]) -> dict[str, Any] | None: CsvPersistor( data_dir=pipeline.paths.data, source_id=str(source_identifier), - ) + ), + JsonPersistor( + data_dir=pipeline.paths.data, + source_id=str(source_identifier), + ), ] queue_manager = QueueManager() @@ -124,24 +126,29 @@ def collect_article(payload: dict[str, Any]) -> dict[str, Any] | None: ) article = None - if article: - queue_manager.enqueue_processed( - ProcessedTaskPayload( - source_id=data.source_id, - env=data.env, - article=article, - ) + queue_manager.enqueue_processed( + ProcessedTaskPayload( + source_id=data.source_id, + env=data.env, + article=article, ) + ) + if article: logger.info( "Persisted article %s and forwarded to processed queue", article.get("link"), ) + else: + logger.info("Persisted article and forwarded to processed queue") return article def forward_for_processing(payload: dict[str, Any]) -> dict[str, Any] | None: data = ProcessedTaskPayload.from_dict(payload) + manager = ConfigManager() + pipeline = manager.get(data.env) + article = dict(data.article) if data.article is not None else None if article is None: logger.info( @@ -153,7 +160,13 @@ def forward_for_processing(payload: dict[str, Any]) -> dict[str, Any] | None: data.source_id, article.get("link"), ) - return article + + persistor = ApiPersistor( + endpoint="http://localhost:8000/api/articles", + client_config=pipeline.fetch.client, + ) + persistor.persist(article) + logger.info("Forwarded article %s to API", article.get("link")) def _collect_html_listing( diff --git a/projects/crawler/src/basango/services/crawler/async/worker.py b/projects/crawler/src/basango/services/crawler/async/worker.py index 87a9078..b9c15db 100644 --- a/projects/crawler/src/basango/services/crawler/async/worker.py +++ b/projects/crawler/src/basango/services/crawler/async/worker.py @@ -1,9 +1,7 @@ -from __future__ import annotations - import logging from typing import Sequence -from rq import Queue, Worker +from rq import Queue, Worker, SimpleWorker from .queue import QueueManager, QueueSettings @@ -16,6 +14,7 @@ def start_worker( *, settings: QueueSettings | None = None, burst: bool = False, + simple: bool = False, ) -> None: manager = QueueManager(settings=settings) if queue_names is None or not list(queue_names): @@ -24,6 +23,11 @@ def start_worker( resolved = [manager.queue_name(name) for name in queue_names] queues = [Queue(name, connection=manager.connection) for name in resolved] - logger.info("Starting RQ worker for queues %s", ", ".join(resolved)) - worker = Worker(queues, connection=manager.connection) + worker_cls = SimpleWorker if simple else Worker + logger.info( + "Starting RQ %s for queues %s", + worker_cls.__name__, + ", ".join(resolved), + ) + worker = worker_cls(queues, connection=manager.connection) worker.work(burst=burst) diff --git a/projects/crawler/src/basango/services/crawler/async_api.py b/projects/crawler/src/basango/services/crawler/async_api.py index 2f08ba5..4406dc3 100644 --- a/projects/crawler/src/basango/services/crawler/async_api.py +++ b/projects/crawler/src/basango/services/crawler/async_api.py @@ -1,5 +1,3 @@ -from __future__ import annotations - from importlib import import_module _async_queue = import_module("basango.services.crawler.async.queue") diff --git a/projects/crawler/src/basango/services/http_client/async_http_client.py b/projects/crawler/src/basango/services/http_client/async_http_client.py index 46b11f5..c405d63 100644 --- a/projects/crawler/src/basango/services/http_client/async_http_client.py +++ b/projects/crawler/src/basango/services/http_client/async_http_client.py @@ -1,5 +1,3 @@ -from __future__ import annotations - import asyncio from dataclasses import dataclass, field diff --git a/projects/crawler/src/basango/services/http_client/base_http_client.py b/projects/crawler/src/basango/services/http_client/base_http_client.py index 52caab7..2444f7c 100644 --- a/projects/crawler/src/basango/services/http_client/base_http_client.py +++ b/projects/crawler/src/basango/services/http_client/base_http_client.py @@ -1,5 +1,3 @@ -from __future__ import annotations - import random from abc import ABC, abstractmethod from dataclasses import dataclass, field diff --git a/projects/crawler/src/basango/services/http_client/sync_http_client.py b/projects/crawler/src/basango/services/http_client/sync_http_client.py index dc971f7..eeae8a7 100644 --- a/projects/crawler/src/basango/services/http_client/sync_http_client.py +++ b/projects/crawler/src/basango/services/http_client/sync_http_client.py @@ -1,5 +1,3 @@ -from __future__ import annotations - import time from dataclasses import dataclass, field diff --git a/projects/crawler/src/basango/services/persistence/api_persistor.py b/projects/crawler/src/basango/services/persistence/api_persistor.py index 1a49dc3..65b5e44 100644 --- a/projects/crawler/src/basango/services/persistence/api_persistor.py +++ b/projects/crawler/src/basango/services/persistence/api_persistor.py @@ -1,9 +1,8 @@ -from __future__ import annotations - import logging from dataclasses import dataclass from typing import Any, Mapping +from basango.core.config import ClientConfig from basango.services.http_client import SyncHttpClient from .base_persistor import BasePersistor @@ -12,17 +11,15 @@ from .base_persistor import BasePersistor @dataclass class ApiPersistor(BasePersistor): endpoint: str - http_client: SyncHttpClient - headers: dict[str, str] | None = None + client_config: ClientConfig raise_for_status: bool = True + def __post_init__(self) -> None: + self.client = SyncHttpClient(client_config=self.client_config) + def persist(self, article: Mapping[str, Any]) -> None: try: - response = self.http_client.post( - self.endpoint, - json=article, - headers=self.headers, - ) + response = self.client.post(self.endpoint, json=article) if self.raise_for_status: response.raise_for_status() except Exception as exc: # noqa: BLE001 diff --git a/projects/crawler/src/basango/services/persistence/base_persistor.py b/projects/crawler/src/basango/services/persistence/base_persistor.py index 6385a43..f4376b3 100644 --- a/projects/crawler/src/basango/services/persistence/base_persistor.py +++ b/projects/crawler/src/basango/services/persistence/base_persistor.py @@ -1,5 +1,3 @@ -from __future__ import annotations - from abc import ABC, abstractmethod from typing import Mapping, Any diff --git a/projects/crawler/src/basango/services/persistence/csv_persistor.py b/projects/crawler/src/basango/services/persistence/csv_persistor.py index 0d5bed7..0f8b2d2 100644 --- a/projects/crawler/src/basango/services/persistence/csv_persistor.py +++ b/projects/crawler/src/basango/services/persistence/csv_persistor.py @@ -1,5 +1,3 @@ -from __future__ import annotations - import csv import json from dataclasses import dataclass, field @@ -44,7 +42,12 @@ class CsvPersistor(BasePersistor): with self._file_path.open( "a", newline="", encoding=self.encoding ) as handle: - writer = csv.DictWriter(handle, fieldnames=self.fieldnames) + writer = csv.DictWriter( + handle, + fieldnames=self.fieldnames, + quoting=csv.QUOTE_ALL, + lineterminator="\n", + ) if needs_header: writer.writeheader() self._header_written = True @@ -61,7 +64,10 @@ class CsvPersistor(BasePersistor): if metadata is None or isinstance(metadata, str): serialised_metadata = metadata else: - serialised_metadata = json.dumps(metadata, ensure_ascii=False) + # JSON-encode metadata to a string that is CSV-safe; csv module will quote it + serialised_metadata = json.dumps( + metadata, ensure_ascii=True, separators=(",", ":"), sort_keys=True + ) record = {field: article.get(field) for field in self.fieldnames} record["categories"] = serialised_categories diff --git a/projects/crawler/src/basango/services/persistence/json_persistor.py b/projects/crawler/src/basango/services/persistence/json_persistor.py index ea16b67..5ea729d 100644 --- a/projects/crawler/src/basango/services/persistence/json_persistor.py +++ b/projects/crawler/src/basango/services/persistence/json_persistor.py @@ -1,5 +1,3 @@ -from __future__ import annotations - import json from dataclasses import dataclass, field from pathlib import Path diff --git a/projects/crawler/tests/basango/services/crawler/test_html_crawler.py b/projects/crawler/tests/basango/services/crawler/test_html_crawler.py index 90a87ee..9fe7267 100644 --- a/projects/crawler/tests/basango/services/crawler/test_html_crawler.py +++ b/projects/crawler/tests/basango/services/crawler/test_html_crawler.py @@ -251,10 +251,9 @@ class TestHtmlCrawler: # Should extract the first number found (2024) assert result == 2024 - def test_supports_html_source_kind(self, html_crawler): + def test_supports_html_source_kind(self): """Test that supports method returns True for HTML source kind.""" - assert html_crawler.supports(SourceKind.HTML) is True - assert html_crawler.supports(SourceKind.WORDPRESS) is False + assert HtmlCrawler.supports() is SourceKind.HTML def test_get_pagination_integration(self, html_crawler): """Integration test for get_pagination calling get_last_page.""" diff --git a/projects/crawler/tests/basango/services/crawler/test_wordpress_crawler.py b/projects/crawler/tests/basango/services/crawler/test_wordpress_crawler.py index 29206cc..e69c573 100644 --- a/projects/crawler/tests/basango/services/crawler/test_wordpress_crawler.py +++ b/projects/crawler/tests/basango/services/crawler/test_wordpress_crawler.py @@ -210,10 +210,9 @@ class TestWordPressCrawler: # Should be sorted by ID: 1, 2, 3 assert result == "alpha,beta,charlie" - def test_supports_wordpress_source_kind(self, wordpress_crawler): + def test_supports_wordpress_source_kind(self): """Test supports method returns True for WordPress source kind.""" - assert wordpress_crawler.supports(SourceKind.WORDPRESS) is True - assert wordpress_crawler.supports(SourceKind.HTML) is False + assert WordpressCrawler.supports() is SourceKind.WORDPRESS @pytest.mark.parametrize( "pages,posts,expected_start,expected_end",