feat(crawler): compute source updates dates

This commit is contained in:
2025-11-25 01:05:39 +02:00
parent 72dfa53f80
commit 1d062f679b
16 changed files with 186 additions and 85 deletions
+2
View File
@@ -1,9 +1,11 @@
import { OpenAPIHono } from "@hono/zod-openapi";
import { articlesRouter } from "#api/rest/routers/articles";
import { sourcesRouter } from "#api/rest/routers/sources";
const routers: OpenAPIHono = new OpenAPIHono();
routers.route("/articles", articlesRouter);
routers.route("/sources", sourcesRouter);
export { routers };
+58
View File
@@ -0,0 +1,58 @@
import { getEarliestPublished, getLatestPublished } from "@basango/db/queries";
import {
getSourceUpdateDatesResponseSchema,
getSourceUpdateDatesSchema,
} from "@basango/domain/models";
import { OpenAPIHono, createRoute } from "@hono/zod-openapi";
import type { Context } from "#api/rest/init";
import { withCrawlerAuth } from "#api/rest/middlewares/crawler";
import { withDatabase } from "#api/rest/middlewares/db";
import { validateResponse } from "#api/utils/response";
const app = new OpenAPIHono<Context>();
app.openapi(
createRoute({
description: "Get the latest and earliest published dates for articles from a specific source.",
method: "post",
middleware: [withCrawlerAuth, withDatabase],
operationId: "GetSourceUpdateDates",
path: "/update-dates",
request: {
body: {
content: {
"application/json": {
schema: getSourceUpdateDatesSchema,
},
},
},
},
responses: {
200: {
content: {
"application/json": {
schema: getSourceUpdateDatesResponseSchema,
},
},
description: "Source update dates retrieved",
},
},
summary: "Get Source Update Dates",
tags: ["Sources"],
"x-speakeasy-name-override": "getSourceUpdateDates",
}),
async (c) => {
const db = c.get("db");
const input = c.req.valid("json");
const [latest, earliest] = await Promise.all([
getLatestPublished(db, input.name),
getEarliestPublished(db, input.name),
]);
return c.json(validateResponse({ earliest, latest }, getSourceUpdateDatesResponseSchema), 200);
},
);
export const sourcesRouter = app;
+9 -34
View File
@@ -1,23 +1,18 @@
import type { HtmlSourceOptions, WordPressSourceOptions } from "@basango/domain/config";
import { Article } from "@basango/domain/models";
import { logger } from "@basango/logger";
import { UnsupportedSourceKindError } from "#crawler/errors";
import { QueueManager, createQueueManager } from "#crawler/process/async/queue";
import {
DetailsTaskPayload,
ListingTaskPayload,
ProcessingTaskPayload,
} from "#crawler/process/async/schemas";
import { DetailsTaskPayload, ListingTaskPayload } from "#crawler/process/async/schemas";
import { createPersistors, resolveCrawlerConfig } from "#crawler/process/crawler";
import { HtmlCrawler } from "#crawler/process/parsers/html";
import { WordPressCrawler } from "#crawler/process/parsers/wordpress";
import { forward } from "#crawler/process/persistence";
import {
createTimestampRange,
formatPageRange,
formatTimestampRange,
resolveSourceConfig,
resolveSourceUpdateDates,
} from "#crawler/utils";
export const collectHtmlListing = async (
@@ -30,6 +25,8 @@ export const collectHtmlListing = async (
}
const settings = resolveCrawlerConfig(source, payload);
await resolveSourceUpdateDates(settings);
const crawler = new HtmlCrawler(settings);
const pageRange = settings.pageRange ?? (await crawler.getPagination());
@@ -69,6 +66,8 @@ export const collectWordPressListing = async (
}
const settings = resolveCrawlerConfig(source, payload);
await resolveSourceUpdateDates(settings);
const crawler = new WordPressCrawler(settings);
const pageRange = settings.pageRange ?? (await crawler.getPagination());
@@ -99,10 +98,7 @@ export const collectWordPressListing = async (
return queued;
};
export const collectArticle = async (
payload: DetailsTaskPayload,
manager: QueueManager = createQueueManager(),
): Promise<unknown> => {
export const collectArticle = async (payload: DetailsTaskPayload): Promise<unknown> => {
const source = resolveSourceConfig(payload.sourceId);
const settings = resolveCrawlerConfig(source, {
category: payload.category,
@@ -116,35 +112,14 @@ export const collectArticle = async (
const crawler = new HtmlCrawler(settings, { persistors });
const html = await crawler.crawl(payload.url);
const article = await crawler.fetchOne(html, settings.dateRange);
await manager.enqueueProcessed({
article,
sourceId: payload.sourceId,
} as ProcessingTaskPayload);
return await crawler.fetchOne(html, settings.dateRange);
}
if (source.sourceKind === "wordpress") {
const crawler = new WordPressCrawler(settings, { persistors });
const article = await crawler.fetchOne(payload.data ?? {}, settings.dateRange);
await manager.enqueueProcessed({
article,
sourceId: payload.sourceId,
} as ProcessingTaskPayload);
return await crawler.fetchOne(payload.data ?? {}, settings.dateRange);
}
throw new UnsupportedSourceKindError(`Unsupported source kind`);
};
export const forwardForProcessing = async (payload: ProcessingTaskPayload): Promise<Article> => {
logger.info({ article: payload.article.title }, "Ready for downstream processing");
try {
logger.info({ article: payload.article.title }, "Forwarding article to API");
await forward(payload.article);
} catch (error) {
logger.error({ error }, "Failed to forward article to API");
}
return payload.article;
};
+1 -13
View File
@@ -9,8 +9,6 @@ import {
DetailsTaskPayloadSchema,
ListingTaskPayload,
ListingTaskPayloadSchema,
ProcessingTaskPayload,
ProcessingTaskPayloadSchema,
} from "#crawler/process/async/schemas";
import { parseRedisUrl } from "#crawler/utils";
@@ -58,7 +56,6 @@ export interface QueueManager {
readonly connection: IORedis;
enqueueListing: (payload: ListingTaskPayload) => Promise<{ id: string }>;
enqueueArticle: (payload: DetailsTaskPayload) => Promise<{ id: string }>;
enqueueProcessed: (payload: ProcessingTaskPayload) => Promise<{ id: string }>;
iterQueueNames: () => string[];
queueName: (suffix: string) => string;
close: () => Promise<void>;
@@ -92,16 +89,7 @@ export const createQueueManager = (options: CreateQueueManagerOptions = {}): Que
const queue = ensureQueue(asyncOptions.queues.listing);
return queue.add("collect_listing", data);
},
enqueueProcessed: (payload) => {
const data = ProcessingTaskPayloadSchema.parse(payload);
const queue = ensureQueue(asyncOptions.queues.processing);
return queue.add("forward_for_processing", data);
},
iterQueueNames: () => [
asyncOptions.queues.listing,
asyncOptions.queues.details,
asyncOptions.queues.processing,
],
iterQueueNames: () => [asyncOptions.queues.listing, asyncOptions.queues.details],
options: asyncOptions,
queueName: (suffix: string) => `${asyncOptions.prefix}:${suffix}`,
};
+1 -7
View File
@@ -1,4 +1,4 @@
import { PageRangeSchema, TimestampRangeSchema, articleSchema } from "@basango/domain/models";
import { PageRangeSchema, TimestampRangeSchema } from "@basango/domain/models";
import { z } from "zod";
export const ListingTaskPayloadSchema = z.object({
@@ -18,11 +18,5 @@ export const DetailsTaskPayloadSchema = z.object({
url: z.url(),
});
export const ProcessingTaskPayloadSchema = z.object({
article: articleSchema,
sourceId: z.string(),
});
export type ListingTaskPayload = z.infer<typeof ListingTaskPayloadSchema>;
export type DetailsTaskPayload = z.infer<typeof DetailsTaskPayloadSchema>;
export type ProcessingTaskPayload = z.infer<typeof ProcessingTaskPayloadSchema>;
+1 -15
View File
@@ -2,11 +2,7 @@ import { logger } from "@basango/logger";
import * as handlers from "#crawler/process/async/handlers";
import { createQueueManager } from "#crawler/process/async/queue";
import {
DetailsTaskPayloadSchema,
ListingTaskPayloadSchema,
ProcessingTaskPayloadSchema,
} from "#crawler/process/async/schemas";
import { DetailsTaskPayloadSchema, ListingTaskPayloadSchema } from "#crawler/process/async/schemas";
import { CrawlingOptions } from "#crawler/process/crawler";
export const collectListing = async (payload: unknown): Promise<number> => {
@@ -29,16 +25,6 @@ export const collectArticle = async (payload: unknown): Promise<unknown> => {
return result;
};
export const forwardForProcessing = async (payload: unknown): Promise<unknown> => {
const data = ProcessingTaskPayloadSchema.parse(payload);
logger.debug({ sourceId: data.sourceId }, "Forwarding article for processing");
const result = await handlers.forwardForProcessing(data);
logger.info({ result }, "Article forwarded for processing");
return result;
};
export const scheduleAsyncCrawl = async (options: CrawlingOptions): Promise<string> => {
const payload = ListingTaskPayloadSchema.parse({
category: options.category,
+1 -3
View File
@@ -2,7 +2,7 @@ import { QueueEvents, Worker } from "bullmq";
import IORedis from "ioredis";
import { QueueFactory, QueueManager } from "#crawler/process/async/queue";
import { collectArticle, collectListing, forwardForProcessing } from "#crawler/process/async/tasks";
import { collectArticle, collectListing } from "#crawler/process/async/tasks";
export interface WorkerOptions {
queueNames?: string[];
@@ -36,8 +36,6 @@ export const startWorker = (options: WorkerOptions): WorkerHandle => {
return collectListing(job.data);
case "collect_article":
return collectArticle(job.data);
case "forward_for_processing":
return forwardForProcessing(job.data);
default:
throw new Error(`Unknown job name: ${job.name}`);
}
+1 -1
View File
@@ -90,7 +90,7 @@ export class HtmlCrawler extends BaseCrawler {
{ url: this.currentNode },
"Article out of date range, stopping further processing",
);
break;
process.exit(0); // stop further processing
}
logger.error({ error, url: this.currentNode }, "Failed to process HTML article");
@@ -76,7 +76,7 @@ export class WordPressCrawler extends BaseCrawler {
{ url: node.link },
"Article out of date range, stopping further processing",
);
break;
process.exit(0); // stop further processing
}
logger.error({ error, url: node.link }, "Failed to process WordPress article");
+31 -4
View File
@@ -2,7 +2,7 @@ import fs from "node:fs";
import path from "node:path";
import { config } from "@basango/domain/config";
import type { Article } from "@basango/domain/models";
import type { Article, SourceUpdateDates } from "@basango/domain/models";
import { md5 } from "@basango/encryption";
import logger from "@basango/logger";
@@ -61,19 +61,46 @@ export const persist = async (
}
}
forward(article).catch((error) => {
logger.error({ error }, "Failed to forward article");
});
logger.info({ url: article.link }, "article successfully persisted");
return article;
};
export const getSourceUpdateDates = async (sourceId: string): Promise<SourceUpdateDates> => {
const client = new SyncHttpClient(config.crawler.fetch.client);
const endpoint = config.crawler.backend.endpoint;
logger.info({ sourceId }, "Fetching source update dates");
const response = await client.post(`${endpoint}/sources/update-dates`, {
headers: {
Authorization: config.crawler.backend.token,
},
json: {
name: sourceId,
},
});
if (response.ok) {
const data = await response.json();
logger.info({ ...data }, "Retrieved source update dates");
return data;
}
logger.error({ sourceId, status: response.status }, "Failed to retrieve source update dates");
return { earliest: new Date(), latest: new Date() };
};
export const forward = async (payload: Partial<Article>): Promise<void> => {
const client = new SyncHttpClient(config.crawler.fetch.client);
const endpoint = config.crawler.backend.endpoint;
const token = config.crawler.backend.token;
try {
const response = await client.post(endpoint, {
const response = await client.post(`${endpoint}/articles`, {
headers: {
Authorization: `${token}`,
Authorization: config.crawler.backend.token,
},
json: payload,
});
+2 -1
View File
@@ -8,12 +8,13 @@ import {
} from "#crawler/process/crawler";
import { HtmlCrawler } from "#crawler/process/parsers/html";
import { WordPressCrawler } from "#crawler/process/parsers/wordpress";
import { resolveSourceConfig } from "#crawler/utils";
import { resolveSourceConfig, resolveSourceUpdateDates } from "#crawler/utils";
export const runSyncCrawl = async (options: CrawlingOptions): Promise<void> => {
const source = resolveSourceConfig(options.sourceId);
const settings = resolveCrawlerConfig(source, options);
const persistors = createPersistors(source);
await resolveSourceUpdateDates(settings);
const crawler =
source.sourceKind === "wordpress"
+38
View File
@@ -13,9 +13,12 @@ import {
TimestampRange,
TimestampRangeSchema,
} from "@basango/domain/models";
import logger from "@basango/logger";
import { format, fromUnixTime, getUnixTime, isMatch, parse } from "date-fns";
import type { RedisOptions } from "ioredis";
import { getSourceUpdateDates } from "./process/persistence";
/**
* Resolve a source configuration by its ID.
* @param id - The source ID
@@ -32,6 +35,41 @@ export const resolveSourceConfig = (id: string): AnySourceOptions => {
return source;
};
export const resolveSourceUpdateDates = async (settings: {
dateRange?: TimestampRange;
direction: "forward" | "backward";
source?: AnySourceOptions;
}) => {
if (settings.dateRange === undefined && settings.source) {
const dates = await getSourceUpdateDates(settings.source.sourceId);
switch (settings.direction) {
case "backward":
settings.dateRange = {
end: getUnixTime(dates.earliest),
start: getUnixTime(new Date()),
};
logger.info(
{ dateRange: settings.dateRange, sourceId: settings.source.sourceId },
"Set date range start from earliest published date",
);
break;
case "forward":
if (dates.latest) {
settings.dateRange = {
end: getUnixTime(new Date()),
start: getUnixTime(dates.latest),
};
logger.info(
{ dateRange: settings.dateRange, sourceId: settings.source.sourceId },
"Set date range start from latest published date",
);
}
break;
}
}
};
/**
* Parse a Redis URL into RedisOptions.
* @param url - The Redis URL (e.g., "redis://:password@localhost:6379/0")