feat(crawler): fix async crawling

This commit is contained in:
2025-11-09 01:01:07 +02:00
parent e8c0f0422b
commit 2b5482e9f5
58 changed files with 243 additions and 169 deletions
+23 -16
View File
@@ -1,17 +1,17 @@
import { logger } from "@basango/logger";
import { config, env } from "@/config";
import { UnsupportedSourceKindError } from "@/errors";
import { SyncHttpClient } from "@/http/http-client";
import { createQueueManager, QueueManager } from "@/process/async/queue";
import { QueueManager, createQueueManager } from "@/process/async/queue";
import {
DetailsTaskPayload,
ListingTaskPayload,
ProcessingTaskPayload,
} from "@/process/async/schemas";
import { resolveCrawlerConfig } from "@/process/crawler";
import { createPersistors, resolveCrawlerConfig } from "@/process/crawler";
import { HtmlCrawler } from "@/process/parsers/html";
import { WordPressCrawler } from "@/process/parsers/wordpress";
import { JsonlPersistor } from "@/process/persistence";
import { Article, HtmlSourceConfig, SourceKindSchema, WordPressSourceConfig } from "@/schema";
import { createDateRange, formatDateRange, formatPageRange, resolveSourceConfig } from "@/utils";
@@ -30,7 +30,7 @@ export const collectHtmlListing = async (
let queued = 0;
for (let page = pageRange.start; page <= pageRange.end; page += 1) {
const target = crawler.buildPageUrl(page) ?? `${source.sourceUrl}`;
const target = crawler.buildEndpointUrl(page) ?? `${source.sourceUrl}`;
try {
const items = await crawler.fetchLinks(target, source.sourceSelectors.articles);
@@ -69,7 +69,7 @@ export const collectWordPressListing = async (
let queued = 0;
for (let page = pageRange.start; page <= pageRange.end; page += 1) {
const url = crawler.postsEndpoint(page);
const url = crawler.buildEndpointUrl(page);
try {
const entries = await crawler.fetchLinks(url);
@@ -94,7 +94,10 @@ export const collectWordPressListing = async (
return queued;
};
export const collectArticle = async (payload: DetailsTaskPayload): Promise<unknown> => {
export const collectArticle = async (
payload: DetailsTaskPayload,
manager: QueueManager = createQueueManager(),
): Promise<unknown> => {
const source = resolveSourceConfig(payload.sourceId);
const settings = resolveCrawlerConfig(source, {
category: payload.category,
@@ -102,26 +105,30 @@ export const collectArticle = async (payload: DetailsTaskPayload): Promise<unkno
pageRange: payload.pageRange ? formatPageRange(payload.pageRange) : undefined,
sourceId: payload.sourceId,
});
const persistors = [
new JsonlPersistor({
directory: config.paths.data,
sourceId: String(source.sourceId),
}),
];
const persistors = createPersistors(source);
if (source.sourceKind === SourceKindSchema.enum.html) {
if (!payload.url) throw new Error("Missing article url");
const crawler = new HtmlCrawler(settings, { persistors });
const html = await crawler.crawl(payload.url);
return await crawler.fetchOne(html, settings.dateRange);
const article = await crawler.fetchOne(html, settings.dateRange);
await manager.enqueueProcessed({
article,
sourceId: payload.sourceId,
} as ProcessingTaskPayload);
}
if (source.sourceKind === SourceKindSchema.enum.wordpress) {
const crawler = new WordPressCrawler(settings, { persistors });
return await crawler.fetchOne(payload.data ?? {}, settings.dateRange);
const article = await crawler.fetchOne(payload.data ?? {}, settings.dateRange);
await manager.enqueueProcessed({
article,
sourceId: payload.sourceId,
} as ProcessingTaskPayload);
}
throw new Error(`Unsupported source kind`);
throw new UnsupportedSourceKindError(`Unsupported source kind`);
};
export const forwardForProcessing = async (payload: ProcessingTaskPayload): Promise<Article> => {
+6 -4
View File
@@ -1,7 +1,9 @@
import { randomUUID } from "node:crypto";
import { JobsOptions, Queue, QueueOptions } from "bullmq";
import IORedis from "ioredis";
import { config, FetchAsyncConfig } from "@/config";
import { FetchAsyncConfig, config } from "@/config";
import {
DetailsTaskPayload,
DetailsTaskPayloadSchema,
@@ -97,9 +99,9 @@ export const createQueueManager = (options: CreateQueueManagerOptions = {}): Que
return queue.add("forward_for_processing", data);
},
iterQueueNames: () => [
`${settings.prefix}:${settings.queues.listing}`,
`${settings.prefix}:${settings.queues.details}`,
`${settings.prefix}:${settings.queues.processing}`,
settings.queues.listing,
settings.queues.details,
settings.queues.processing,
],
queueName: (suffix: string) => `${settings.prefix}:${suffix}`,
settings,
@@ -1,4 +1,5 @@
import { z } from "zod";
import { ArticleSchema, DateRangeSchema, PageRangeSchema } from "@/schema";
export const ListingTaskPayloadSchema = z.object({
+1
View File
@@ -1,4 +1,5 @@
import { logger } from "@basango/logger";
import * as handlers from "@/process/async/handlers";
import { createQueueManager } from "@/process/async/queue";
import {
+5 -1
View File
@@ -45,6 +45,7 @@ export const startWorker = (options: WorkerOptions): WorkerHandle => {
{
concurrency: options.concurrency ?? 5,
connection,
prefix: manager.settings.prefix,
},
);
@@ -53,7 +54,10 @@ export const startWorker = (options: WorkerOptions): WorkerHandle => {
worker.on("error", (err) => options.onError?.(err as Error));
}
const queueEvents = new QueueEvents(queueName, { connection });
const queueEvents = new QueueEvents(queueName, {
connection,
prefix: manager.settings.prefix,
});
workers.push(worker);
events.push(queueEvents);