diff --git a/.env b/.env index 5ec6e5b..58d3bc6 100644 --- a/.env +++ b/.env @@ -4,7 +4,7 @@ BASANGO_API_PORT=3080 BASANGO_API_ALLOWED_ORIGINS=http://localhost:3000,http://127.0.0.1:3000 BASANGO_API_KEY=your_api_key_here BASANGO_API_CRAWLER_TOKEN=dev -BASANGO_API_CRAWLER_ENDPOINT="http://localhost:3080/articles" +BASANGO_API_CRAWLER_ENDPOINT="http://localhost:3080" BASANGO_API_JWT_SECRET=your_jwt_secret_here # db diff --git a/apps/api/src/rest/routers/index.ts b/apps/api/src/rest/routers/index.ts index 8ce8050..021d0ef 100644 --- a/apps/api/src/rest/routers/index.ts +++ b/apps/api/src/rest/routers/index.ts @@ -1,9 +1,11 @@ import { OpenAPIHono } from "@hono/zod-openapi"; import { articlesRouter } from "#api/rest/routers/articles"; +import { sourcesRouter } from "#api/rest/routers/sources"; const routers: OpenAPIHono = new OpenAPIHono(); routers.route("/articles", articlesRouter); +routers.route("/sources", sourcesRouter); export { routers }; diff --git a/apps/api/src/rest/routers/sources.ts b/apps/api/src/rest/routers/sources.ts new file mode 100644 index 0000000..bed81e9 --- /dev/null +++ b/apps/api/src/rest/routers/sources.ts @@ -0,0 +1,58 @@ +import { getEarliestPublished, getLatestPublished } from "@basango/db/queries"; +import { + getSourceUpdateDatesResponseSchema, + getSourceUpdateDatesSchema, +} from "@basango/domain/models"; +import { OpenAPIHono, createRoute } from "@hono/zod-openapi"; + +import type { Context } from "#api/rest/init"; +import { withCrawlerAuth } from "#api/rest/middlewares/crawler"; +import { withDatabase } from "#api/rest/middlewares/db"; +import { validateResponse } from "#api/utils/response"; + +const app = new OpenAPIHono(); + +app.openapi( + createRoute({ + description: "Get the latest and earliest published dates for articles from a specific source.", + method: "post", + middleware: [withCrawlerAuth, withDatabase], + operationId: "GetSourceUpdateDates", + path: "/update-dates", + request: { + body: { + content: { + "application/json": { + schema: getSourceUpdateDatesSchema, + }, + }, + }, + }, + responses: { + 200: { + content: { + "application/json": { + schema: getSourceUpdateDatesResponseSchema, + }, + }, + description: "Source update dates retrieved", + }, + }, + summary: "Get Source Update Dates", + tags: ["Sources"], + "x-speakeasy-name-override": "getSourceUpdateDates", + }), + async (c) => { + const db = c.get("db"); + const input = c.req.valid("json"); + + const [latest, earliest] = await Promise.all([ + getLatestPublished(db, input.name), + getEarliestPublished(db, input.name), + ]); + + return c.json(validateResponse({ earliest, latest }, getSourceUpdateDatesResponseSchema), 200); + }, +); + +export const sourcesRouter = app; diff --git a/apps/crawler/src/process/async/handlers.ts b/apps/crawler/src/process/async/handlers.ts index 20c35ca..17c38a1 100644 --- a/apps/crawler/src/process/async/handlers.ts +++ b/apps/crawler/src/process/async/handlers.ts @@ -1,23 +1,18 @@ import type { HtmlSourceOptions, WordPressSourceOptions } from "@basango/domain/config"; -import { Article } from "@basango/domain/models"; import { logger } from "@basango/logger"; import { UnsupportedSourceKindError } from "#crawler/errors"; import { QueueManager, createQueueManager } from "#crawler/process/async/queue"; -import { - DetailsTaskPayload, - ListingTaskPayload, - ProcessingTaskPayload, -} from "#crawler/process/async/schemas"; +import { DetailsTaskPayload, ListingTaskPayload } from "#crawler/process/async/schemas"; import { createPersistors, resolveCrawlerConfig } from "#crawler/process/crawler"; import { HtmlCrawler } from "#crawler/process/parsers/html"; import { WordPressCrawler } from "#crawler/process/parsers/wordpress"; -import { forward } from "#crawler/process/persistence"; import { createTimestampRange, formatPageRange, formatTimestampRange, resolveSourceConfig, + resolveSourceUpdateDates, } from "#crawler/utils"; export const collectHtmlListing = async ( @@ -30,6 +25,8 @@ export const collectHtmlListing = async ( } const settings = resolveCrawlerConfig(source, payload); + await resolveSourceUpdateDates(settings); + const crawler = new HtmlCrawler(settings); const pageRange = settings.pageRange ?? (await crawler.getPagination()); @@ -69,6 +66,8 @@ export const collectWordPressListing = async ( } const settings = resolveCrawlerConfig(source, payload); + await resolveSourceUpdateDates(settings); + const crawler = new WordPressCrawler(settings); const pageRange = settings.pageRange ?? (await crawler.getPagination()); @@ -99,10 +98,7 @@ export const collectWordPressListing = async ( return queued; }; -export const collectArticle = async ( - payload: DetailsTaskPayload, - manager: QueueManager = createQueueManager(), -): Promise => { +export const collectArticle = async (payload: DetailsTaskPayload): Promise => { const source = resolveSourceConfig(payload.sourceId); const settings = resolveCrawlerConfig(source, { category: payload.category, @@ -116,35 +112,14 @@ export const collectArticle = async ( const crawler = new HtmlCrawler(settings, { persistors }); const html = await crawler.crawl(payload.url); - const article = await crawler.fetchOne(html, settings.dateRange); - await manager.enqueueProcessed({ - article, - sourceId: payload.sourceId, - } as ProcessingTaskPayload); + return await crawler.fetchOne(html, settings.dateRange); } if (source.sourceKind === "wordpress") { const crawler = new WordPressCrawler(settings, { persistors }); - const article = await crawler.fetchOne(payload.data ?? {}, settings.dateRange); - await manager.enqueueProcessed({ - article, - sourceId: payload.sourceId, - } as ProcessingTaskPayload); + return await crawler.fetchOne(payload.data ?? {}, settings.dateRange); } throw new UnsupportedSourceKindError(`Unsupported source kind`); }; - -export const forwardForProcessing = async (payload: ProcessingTaskPayload): Promise
=> { - logger.info({ article: payload.article.title }, "Ready for downstream processing"); - - try { - logger.info({ article: payload.article.title }, "Forwarding article to API"); - await forward(payload.article); - } catch (error) { - logger.error({ error }, "Failed to forward article to API"); - } - - return payload.article; -}; diff --git a/apps/crawler/src/process/async/queue.ts b/apps/crawler/src/process/async/queue.ts index 55795ec..c8cd9c2 100644 --- a/apps/crawler/src/process/async/queue.ts +++ b/apps/crawler/src/process/async/queue.ts @@ -9,8 +9,6 @@ import { DetailsTaskPayloadSchema, ListingTaskPayload, ListingTaskPayloadSchema, - ProcessingTaskPayload, - ProcessingTaskPayloadSchema, } from "#crawler/process/async/schemas"; import { parseRedisUrl } from "#crawler/utils"; @@ -58,7 +56,6 @@ export interface QueueManager { readonly connection: IORedis; enqueueListing: (payload: ListingTaskPayload) => Promise<{ id: string }>; enqueueArticle: (payload: DetailsTaskPayload) => Promise<{ id: string }>; - enqueueProcessed: (payload: ProcessingTaskPayload) => Promise<{ id: string }>; iterQueueNames: () => string[]; queueName: (suffix: string) => string; close: () => Promise; @@ -92,16 +89,7 @@ export const createQueueManager = (options: CreateQueueManagerOptions = {}): Que const queue = ensureQueue(asyncOptions.queues.listing); return queue.add("collect_listing", data); }, - enqueueProcessed: (payload) => { - const data = ProcessingTaskPayloadSchema.parse(payload); - const queue = ensureQueue(asyncOptions.queues.processing); - return queue.add("forward_for_processing", data); - }, - iterQueueNames: () => [ - asyncOptions.queues.listing, - asyncOptions.queues.details, - asyncOptions.queues.processing, - ], + iterQueueNames: () => [asyncOptions.queues.listing, asyncOptions.queues.details], options: asyncOptions, queueName: (suffix: string) => `${asyncOptions.prefix}:${suffix}`, }; diff --git a/apps/crawler/src/process/async/schemas.ts b/apps/crawler/src/process/async/schemas.ts index 5b51b2b..8ac6adc 100644 --- a/apps/crawler/src/process/async/schemas.ts +++ b/apps/crawler/src/process/async/schemas.ts @@ -1,4 +1,4 @@ -import { PageRangeSchema, TimestampRangeSchema, articleSchema } from "@basango/domain/models"; +import { PageRangeSchema, TimestampRangeSchema } from "@basango/domain/models"; import { z } from "zod"; export const ListingTaskPayloadSchema = z.object({ @@ -18,11 +18,5 @@ export const DetailsTaskPayloadSchema = z.object({ url: z.url(), }); -export const ProcessingTaskPayloadSchema = z.object({ - article: articleSchema, - sourceId: z.string(), -}); - export type ListingTaskPayload = z.infer; export type DetailsTaskPayload = z.infer; -export type ProcessingTaskPayload = z.infer; diff --git a/apps/crawler/src/process/async/tasks.ts b/apps/crawler/src/process/async/tasks.ts index 2781754..4a7c596 100644 --- a/apps/crawler/src/process/async/tasks.ts +++ b/apps/crawler/src/process/async/tasks.ts @@ -2,11 +2,7 @@ import { logger } from "@basango/logger"; import * as handlers from "#crawler/process/async/handlers"; import { createQueueManager } from "#crawler/process/async/queue"; -import { - DetailsTaskPayloadSchema, - ListingTaskPayloadSchema, - ProcessingTaskPayloadSchema, -} from "#crawler/process/async/schemas"; +import { DetailsTaskPayloadSchema, ListingTaskPayloadSchema } from "#crawler/process/async/schemas"; import { CrawlingOptions } from "#crawler/process/crawler"; export const collectListing = async (payload: unknown): Promise => { @@ -29,16 +25,6 @@ export const collectArticle = async (payload: unknown): Promise => { return result; }; -export const forwardForProcessing = async (payload: unknown): Promise => { - const data = ProcessingTaskPayloadSchema.parse(payload); - logger.debug({ sourceId: data.sourceId }, "Forwarding article for processing"); - - const result = await handlers.forwardForProcessing(data); - logger.info({ result }, "Article forwarded for processing"); - - return result; -}; - export const scheduleAsyncCrawl = async (options: CrawlingOptions): Promise => { const payload = ListingTaskPayloadSchema.parse({ category: options.category, diff --git a/apps/crawler/src/process/async/worker.ts b/apps/crawler/src/process/async/worker.ts index 284afb5..a1cce4e 100644 --- a/apps/crawler/src/process/async/worker.ts +++ b/apps/crawler/src/process/async/worker.ts @@ -2,7 +2,7 @@ import { QueueEvents, Worker } from "bullmq"; import IORedis from "ioredis"; import { QueueFactory, QueueManager } from "#crawler/process/async/queue"; -import { collectArticle, collectListing, forwardForProcessing } from "#crawler/process/async/tasks"; +import { collectArticle, collectListing } from "#crawler/process/async/tasks"; export interface WorkerOptions { queueNames?: string[]; @@ -36,8 +36,6 @@ export const startWorker = (options: WorkerOptions): WorkerHandle => { return collectListing(job.data); case "collect_article": return collectArticle(job.data); - case "forward_for_processing": - return forwardForProcessing(job.data); default: throw new Error(`Unknown job name: ${job.name}`); } diff --git a/apps/crawler/src/process/parsers/html.ts b/apps/crawler/src/process/parsers/html.ts index 92a0912..86846a9 100644 --- a/apps/crawler/src/process/parsers/html.ts +++ b/apps/crawler/src/process/parsers/html.ts @@ -90,7 +90,7 @@ export class HtmlCrawler extends BaseCrawler { { url: this.currentNode }, "Article out of date range, stopping further processing", ); - break; + process.exit(0); // stop further processing } logger.error({ error, url: this.currentNode }, "Failed to process HTML article"); diff --git a/apps/crawler/src/process/parsers/wordpress.ts b/apps/crawler/src/process/parsers/wordpress.ts index 315fd72..3f5cd37 100644 --- a/apps/crawler/src/process/parsers/wordpress.ts +++ b/apps/crawler/src/process/parsers/wordpress.ts @@ -76,7 +76,7 @@ export class WordPressCrawler extends BaseCrawler { { url: node.link }, "Article out of date range, stopping further processing", ); - break; + process.exit(0); // stop further processing } logger.error({ error, url: node.link }, "Failed to process WordPress article"); diff --git a/apps/crawler/src/process/persistence.ts b/apps/crawler/src/process/persistence.ts index e376f91..028f07a 100644 --- a/apps/crawler/src/process/persistence.ts +++ b/apps/crawler/src/process/persistence.ts @@ -2,7 +2,7 @@ import fs from "node:fs"; import path from "node:path"; import { config } from "@basango/domain/config"; -import type { Article } from "@basango/domain/models"; +import type { Article, SourceUpdateDates } from "@basango/domain/models"; import { md5 } from "@basango/encryption"; import logger from "@basango/logger"; @@ -61,19 +61,46 @@ export const persist = async ( } } + forward(article).catch((error) => { + logger.error({ error }, "Failed to forward article"); + }); + logger.info({ url: article.link }, "article successfully persisted"); return article; }; +export const getSourceUpdateDates = async (sourceId: string): Promise => { + const client = new SyncHttpClient(config.crawler.fetch.client); + const endpoint = config.crawler.backend.endpoint; + + logger.info({ sourceId }, "Fetching source update dates"); + const response = await client.post(`${endpoint}/sources/update-dates`, { + headers: { + Authorization: config.crawler.backend.token, + }, + json: { + name: sourceId, + }, + }); + + if (response.ok) { + const data = await response.json(); + logger.info({ ...data }, "Retrieved source update dates"); + return data; + } + + logger.error({ sourceId, status: response.status }, "Failed to retrieve source update dates"); + return { earliest: new Date(), latest: new Date() }; +}; + export const forward = async (payload: Partial
): Promise => { const client = new SyncHttpClient(config.crawler.fetch.client); const endpoint = config.crawler.backend.endpoint; - const token = config.crawler.backend.token; try { - const response = await client.post(endpoint, { + const response = await client.post(`${endpoint}/articles`, { headers: { - Authorization: `${token}`, + Authorization: config.crawler.backend.token, }, json: payload, }); diff --git a/apps/crawler/src/process/sync/tasks.ts b/apps/crawler/src/process/sync/tasks.ts index 63e6088..16a6f32 100644 --- a/apps/crawler/src/process/sync/tasks.ts +++ b/apps/crawler/src/process/sync/tasks.ts @@ -8,12 +8,13 @@ import { } from "#crawler/process/crawler"; import { HtmlCrawler } from "#crawler/process/parsers/html"; import { WordPressCrawler } from "#crawler/process/parsers/wordpress"; -import { resolveSourceConfig } from "#crawler/utils"; +import { resolveSourceConfig, resolveSourceUpdateDates } from "#crawler/utils"; export const runSyncCrawl = async (options: CrawlingOptions): Promise => { const source = resolveSourceConfig(options.sourceId); const settings = resolveCrawlerConfig(source, options); const persistors = createPersistors(source); + await resolveSourceUpdateDates(settings); const crawler = source.sourceKind === "wordpress" diff --git a/apps/crawler/src/utils.ts b/apps/crawler/src/utils.ts index 3e646aa..64b0cf0 100644 --- a/apps/crawler/src/utils.ts +++ b/apps/crawler/src/utils.ts @@ -13,9 +13,12 @@ import { TimestampRange, TimestampRangeSchema, } from "@basango/domain/models"; +import logger from "@basango/logger"; import { format, fromUnixTime, getUnixTime, isMatch, parse } from "date-fns"; import type { RedisOptions } from "ioredis"; +import { getSourceUpdateDates } from "./process/persistence"; + /** * Resolve a source configuration by its ID. * @param id - The source ID @@ -32,6 +35,41 @@ export const resolveSourceConfig = (id: string): AnySourceOptions => { return source; }; +export const resolveSourceUpdateDates = async (settings: { + dateRange?: TimestampRange; + direction: "forward" | "backward"; + source?: AnySourceOptions; +}) => { + if (settings.dateRange === undefined && settings.source) { + const dates = await getSourceUpdateDates(settings.source.sourceId); + + switch (settings.direction) { + case "backward": + settings.dateRange = { + end: getUnixTime(dates.earliest), + start: getUnixTime(new Date()), + }; + logger.info( + { dateRange: settings.dateRange, sourceId: settings.source.sourceId }, + "Set date range start from earliest published date", + ); + break; + case "forward": + if (dates.latest) { + settings.dateRange = { + end: getUnixTime(new Date()), + start: getUnixTime(dates.latest), + }; + logger.info( + { dateRange: settings.dateRange, sourceId: settings.source.sourceId }, + "Set date range start from latest published date", + ); + } + break; + } + } +}; + /** * Parse a Redis URL into RedisOptions. * @param url - The Redis URL (e.g., "redis://:password@localhost:6379/0") diff --git a/packages/db/src/queries/sources.ts b/packages/db/src/queries/sources.ts index 415d6e3..30a388a 100644 --- a/packages/db/src/queries/sources.ts +++ b/packages/db/src/queries/sources.ts @@ -1,6 +1,6 @@ import { DEFAULT_CATEGORY_SHARES_LIMIT, DEFAULT_TIMEZONE } from "@basango/domain/constants"; import { ID, Publication, Publications } from "@basango/domain/models"; -import { eq, sql } from "drizzle-orm"; +import { eq, max, min, sql } from "drizzle-orm"; import * as uuid from "uuid"; import { Database } from "#db/client"; @@ -161,3 +161,27 @@ export async function getSourceCategoryShares( return { items: data.rows, total: data.rowCount ?? 0 }; } + +export async function getLatestPublished(db: Database, source: string): Promise { + const result = await db + .select({ + publishedAt: max(articles.publishedAt), + }) + .from(articles) + .innerJoin(sources, eq(articles.sourceId, sources.id)) + .where(eq(sources.name, source)); + + return result[0]?.publishedAt ?? new Date(); +} + +export async function getEarliestPublished(db: Database, source: string): Promise { + const result = await db + .select({ + publishedAt: min(articles.publishedAt), + }) + .from(articles) + .innerJoin(sources, eq(articles.sourceId, sources.id)) + .where(eq(sources.name, source)); + + return result[0]?.publishedAt ?? new Date(); +} diff --git a/packages/domain/src/models/articles.ts b/packages/domain/src/models/articles.ts index 8614e8e..b197937 100644 --- a/packages/domain/src/models/articles.ts +++ b/packages/domain/src/models/articles.ts @@ -8,9 +8,9 @@ export const articleMetadataSchema = z.object({ author: z.string().optional(), description: z.string().optional(), image: z.url().optional(), - publishedAt: z.date().optional(), + publishedAt: z.string().optional(), title: z.string().optional(), - updatedAt: z.date().optional(), + updatedAt: z.string().optional(), url: z.url().optional(), }); @@ -25,7 +25,7 @@ export const tokenStatisticsSchema = z.object({ export const articleSchema = z.object({ body: z.string().min(1), categories: z.array(z.string()), - createdAt: z.date(), + createdAt: z.coerce.date(), excerpt: z.string().optional(), hash: z.string().min(1), id: idSchema, @@ -38,7 +38,7 @@ export const articleSchema = z.object({ sourceId: z.union([z.uuid(), z.string().min(1)]), title: z.string().min(1), tokenStatistics: tokenStatisticsSchema.optional(), - updatedAt: z.date().optional(), + updatedAt: z.coerce.date().optional(), }); // API diff --git a/packages/domain/src/models/sources.ts b/packages/domain/src/models/sources.ts index 5150588..a1715af 100644 --- a/packages/domain/src/models/sources.ts +++ b/packages/domain/src/models/sources.ts @@ -39,5 +39,15 @@ export const updateSourceSchema = sourceSchema.pick({ url: true, }); +export const getSourceUpdateDatesSchema = z.object({ + name: z.string().min(1).max(255), +}); + +export const getSourceUpdateDatesResponseSchema = z.object({ + earliest: z.coerce.date(), + latest: z.coerce.date(), +}); + // types export type Source = z.infer; +export type SourceUpdateDates = z.infer;