fix: async queue

This commit is contained in:
2025-10-06 20:00:54 +02:00
parent 497633fb61
commit e105ff233f
16 changed files with 88 additions and 60 deletions
+1
View File
@@ -9,6 +9,7 @@ var/
# Python-generated files
__pycache__/
.pytest_cache/
*.py[oc]
build/
dist/
+33 -8
View File
@@ -1,13 +1,12 @@
from __future__ import annotations
from typing import List, Optional
from enum import Enum
import typer
from basango.core.config import CrawlerConfig
from basango.core.config_manager import ConfigManager
from basango.domain import DateRange, PageRange, UpdateDirection
from basango.services import CsvPersistor
from basango.services import CsvPersistor, JsonPersistor
from basango.services.crawler.async_api import (
QueueSettings,
schedule_async_crawl,
@@ -19,6 +18,12 @@ from basango.services.crawler.wordpress_crawler import WordpressCrawler
app = typer.Typer(no_args_is_help=True, add_completion=False)
class QueueName(str, Enum):
listing = "listing"
articles = "articles"
processed = "processed"
@app.command("crawl")
def crawl_cmd(
source_id: str = typer.Option(
@@ -79,7 +84,11 @@ def crawl_cmd(
CsvPersistor(
data_dir=pipeline.paths.data,
source_id=str(source_identifier),
)
),
JsonPersistor(
data_dir=pipeline.paths.data,
source_id=str(source_identifier),
),
]
for crawler in crawlers:
@@ -95,11 +104,22 @@ def crawl_cmd(
@app.command("worker")
def worker_cmd(
queue: Optional[List[str]] = typer.Option(
queue: Optional[List[QueueName]] = typer.Option(
None,
"--queue",
"-q",
help="Queue name(s) (without prefix). Provide multiple times to listen to more than one queue.",
help=(
"Queue name(s) (without prefix). Choices: listing, articles, processed. "
"Provide multiple times to listen to more than one queue."
),
),
simple: bool = typer.Option(
False,
"--simple/--no-simple",
help=(
"Run jobs in-process using RQ SimpleWorker (no forking). "
"Recommended on macOS to avoid fork-related crashes."
),
),
burst: bool = typer.Option(
False,
@@ -125,5 +145,10 @@ def worker_cmd(
manager.setup_logging(pipeline)
settings = QueueSettings(redis_url=redis_url) if redis_url else QueueSettings()
queue_names = list(queue) if queue else None
start_worker(queue_names=queue_names, settings=settings, burst=burst)
queue_names = [q.value for q in queue] if queue else None
start_worker(
queue_names=queue_names,
settings=settings,
burst=burst,
simple=simple,
)
@@ -1,5 +1,3 @@
from __future__ import annotations
import os
from dataclasses import dataclass, field
from typing import Iterable
@@ -1,5 +1,3 @@
from __future__ import annotations
from dataclasses import asdict, dataclass, fields
from typing import Any, Mapping
@@ -1,12 +1,10 @@
from __future__ import annotations
import logging
from typing import Any
from basango.core.config import CrawlerConfig
from basango.core.config_manager import ConfigManager
from basango.domain import DateRange, PageRange, SourceKind, UpdateDirection
from basango.services import CsvPersistor
from basango.services import CsvPersistor, JsonPersistor, ApiPersistor
from basango.services.crawler.html_crawler import HtmlCrawler
from basango.services.crawler.wordpress_crawler import WordpressCrawler
@@ -101,7 +99,11 @@ def collect_article(payload: dict[str, Any]) -> dict[str, Any] | None:
CsvPersistor(
data_dir=pipeline.paths.data,
source_id=str(source_identifier),
)
),
JsonPersistor(
data_dir=pipeline.paths.data,
source_id=str(source_identifier),
),
]
queue_manager = QueueManager()
@@ -124,24 +126,29 @@ def collect_article(payload: dict[str, Any]) -> dict[str, Any] | None:
)
article = None
if article:
queue_manager.enqueue_processed(
ProcessedTaskPayload(
source_id=data.source_id,
env=data.env,
article=article,
)
queue_manager.enqueue_processed(
ProcessedTaskPayload(
source_id=data.source_id,
env=data.env,
article=article,
)
)
if article:
logger.info(
"Persisted article %s and forwarded to processed queue",
article.get("link"),
)
else:
logger.info("Persisted article and forwarded to processed queue")
return article
def forward_for_processing(payload: dict[str, Any]) -> dict[str, Any] | None:
data = ProcessedTaskPayload.from_dict(payload)
manager = ConfigManager()
pipeline = manager.get(data.env)
article = dict(data.article) if data.article is not None else None
if article is None:
logger.info(
@@ -153,7 +160,13 @@ def forward_for_processing(payload: dict[str, Any]) -> dict[str, Any] | None:
data.source_id,
article.get("link"),
)
return article
persistor = ApiPersistor(
endpoint="http://localhost:8000/api/articles",
client_config=pipeline.fetch.client,
)
persistor.persist(article)
logger.info("Forwarded article %s to API", article.get("link"))
def _collect_html_listing(
@@ -1,9 +1,7 @@
from __future__ import annotations
import logging
from typing import Sequence
from rq import Queue, Worker
from rq import Queue, Worker, SimpleWorker
from .queue import QueueManager, QueueSettings
@@ -16,6 +14,7 @@ def start_worker(
*,
settings: QueueSettings | None = None,
burst: bool = False,
simple: bool = False,
) -> None:
manager = QueueManager(settings=settings)
if queue_names is None or not list(queue_names):
@@ -24,6 +23,11 @@ def start_worker(
resolved = [manager.queue_name(name) for name in queue_names]
queues = [Queue(name, connection=manager.connection) for name in resolved]
logger.info("Starting RQ worker for queues %s", ", ".join(resolved))
worker = Worker(queues, connection=manager.connection)
worker_cls = SimpleWorker if simple else Worker
logger.info(
"Starting RQ %s for queues %s",
worker_cls.__name__,
", ".join(resolved),
)
worker = worker_cls(queues, connection=manager.connection)
worker.work(burst=burst)
@@ -1,5 +1,3 @@
from __future__ import annotations
from importlib import import_module
_async_queue = import_module("basango.services.crawler.async.queue")
@@ -1,5 +1,3 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
@@ -1,5 +1,3 @@
from __future__ import annotations
import random
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
@@ -1,5 +1,3 @@
from __future__ import annotations
import time
from dataclasses import dataclass, field
@@ -1,9 +1,8 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any, Mapping
from basango.core.config import ClientConfig
from basango.services.http_client import SyncHttpClient
from .base_persistor import BasePersistor
@@ -12,17 +11,15 @@ from .base_persistor import BasePersistor
@dataclass
class ApiPersistor(BasePersistor):
endpoint: str
http_client: SyncHttpClient
headers: dict[str, str] | None = None
client_config: ClientConfig
raise_for_status: bool = True
def __post_init__(self) -> None:
self.client = SyncHttpClient(client_config=self.client_config)
def persist(self, article: Mapping[str, Any]) -> None:
try:
response = self.http_client.post(
self.endpoint,
json=article,
headers=self.headers,
)
response = self.client.post(self.endpoint, json=article)
if self.raise_for_status:
response.raise_for_status()
except Exception as exc: # noqa: BLE001
@@ -1,5 +1,3 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Mapping, Any
@@ -1,5 +1,3 @@
from __future__ import annotations
import csv
import json
from dataclasses import dataclass, field
@@ -44,7 +42,12 @@ class CsvPersistor(BasePersistor):
with self._file_path.open(
"a", newline="", encoding=self.encoding
) as handle:
writer = csv.DictWriter(handle, fieldnames=self.fieldnames)
writer = csv.DictWriter(
handle,
fieldnames=self.fieldnames,
quoting=csv.QUOTE_ALL,
lineterminator="\n",
)
if needs_header:
writer.writeheader()
self._header_written = True
@@ -61,7 +64,10 @@ class CsvPersistor(BasePersistor):
if metadata is None or isinstance(metadata, str):
serialised_metadata = metadata
else:
serialised_metadata = json.dumps(metadata, ensure_ascii=False)
# JSON-encode metadata to a string that is CSV-safe; csv module will quote it
serialised_metadata = json.dumps(
metadata, ensure_ascii=True, separators=(",", ":"), sort_keys=True
)
record = {field: article.get(field) for field in self.fieldnames}
record["categories"] = serialised_categories
@@ -1,5 +1,3 @@
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
@@ -251,10 +251,9 @@ class TestHtmlCrawler:
# Should extract the first number found (2024)
assert result == 2024
def test_supports_html_source_kind(self, html_crawler):
def test_supports_html_source_kind(self):
"""Test that supports method returns True for HTML source kind."""
assert html_crawler.supports(SourceKind.HTML) is True
assert html_crawler.supports(SourceKind.WORDPRESS) is False
assert HtmlCrawler.supports() is SourceKind.HTML
def test_get_pagination_integration(self, html_crawler):
"""Integration test for get_pagination calling get_last_page."""
@@ -210,10 +210,9 @@ class TestWordPressCrawler:
# Should be sorted by ID: 1, 2, 3
assert result == "alpha,beta,charlie"
def test_supports_wordpress_source_kind(self, wordpress_crawler):
def test_supports_wordpress_source_kind(self):
"""Test supports method returns True for WordPress source kind."""
assert wordpress_crawler.supports(SourceKind.WORDPRESS) is True
assert wordpress_crawler.supports(SourceKind.HTML) is False
assert WordpressCrawler.supports() is SourceKind.WORDPRESS
@pytest.mark.parametrize(
"pages,posts,expected_start,expected_end",