feat(monorepo): migrate to typescript monorepo
This commit is contained in:
@@ -0,0 +1,137 @@
|
||||
import { logger } from "@basango/logger";
|
||||
|
||||
import { config, env } from "@/config";
|
||||
import { SyncHttpClient } from "@/http/http-client";
|
||||
import { createQueueManager, QueueManager } from "@/process/async/queue";
|
||||
import {
|
||||
DetailsTaskPayload,
|
||||
ListingTaskPayload,
|
||||
ProcessingTaskPayload,
|
||||
} from "@/process/async/schemas";
|
||||
import { resolveCrawlerConfig } from "@/process/crawler";
|
||||
import { HtmlCrawler } from "@/process/parsers/html";
|
||||
import { WordPressCrawler } from "@/process/parsers/wordpress";
|
||||
import { JsonlPersistor } from "@/process/persistence";
|
||||
import { Article, HtmlSourceConfig, SourceKindSchema, WordPressSourceConfig } from "@/schema";
|
||||
import { createDateRange, formatDateRange, formatPageRange, resolveSourceConfig } from "@/utils";
|
||||
|
||||
export const collectHtmlListing = async (
|
||||
payload: ListingTaskPayload,
|
||||
manager: QueueManager = createQueueManager(),
|
||||
): Promise<number> => {
|
||||
const source = resolveSourceConfig(payload.sourceId) as HtmlSourceConfig;
|
||||
if (source.sourceKind !== "html") {
|
||||
return await collectWordPressListing(payload, manager);
|
||||
}
|
||||
|
||||
const settings = resolveCrawlerConfig(source, payload);
|
||||
const crawler = new HtmlCrawler(settings);
|
||||
const pageRange = settings.pageRange ?? (await crawler.getPagination());
|
||||
|
||||
let queued = 0;
|
||||
for (let page = pageRange.start; page <= pageRange.end; page += 1) {
|
||||
const target = crawler.buildPageUrl(page) ?? `${source.sourceUrl}`;
|
||||
|
||||
try {
|
||||
const items = await crawler.fetchLinks(target, source.sourceSelectors.articles);
|
||||
for (const node of items) {
|
||||
const url = crawler.extractLink(node);
|
||||
if (!url) continue;
|
||||
|
||||
await manager.enqueueArticle({
|
||||
category: payload.category,
|
||||
dateRange: createDateRange(payload.dateRange),
|
||||
sourceId: payload.sourceId,
|
||||
url,
|
||||
} as DetailsTaskPayload);
|
||||
queued += 1;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({ error, target }, "Failed to crawl page");
|
||||
}
|
||||
}
|
||||
|
||||
return queued;
|
||||
};
|
||||
|
||||
export const collectWordPressListing = async (
|
||||
payload: ListingTaskPayload,
|
||||
manager: QueueManager = createQueueManager(),
|
||||
): Promise<number> => {
|
||||
const source = resolveSourceConfig(payload.sourceId) as WordPressSourceConfig;
|
||||
if (source.sourceKind !== "wordpress") {
|
||||
return await collectHtmlListing(payload, manager);
|
||||
}
|
||||
|
||||
const settings = resolveCrawlerConfig(source, payload);
|
||||
const crawler = new WordPressCrawler(settings);
|
||||
const pageRange = settings.pageRange ?? (await crawler.getPagination());
|
||||
|
||||
let queued = 0;
|
||||
for (let page = pageRange.start; page <= pageRange.end; page += 1) {
|
||||
const url = crawler.postsEndpoint(page);
|
||||
|
||||
try {
|
||||
const entries = await crawler.fetchLinks(url);
|
||||
for (const data of entries) {
|
||||
const url = data.link;
|
||||
if (!url) continue;
|
||||
|
||||
await manager.enqueueArticle({
|
||||
category: payload.category,
|
||||
data,
|
||||
dateRange: createDateRange(payload.dateRange),
|
||||
sourceId: payload.sourceId,
|
||||
url,
|
||||
} as DetailsTaskPayload);
|
||||
queued += 1;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error({ error, page }, "Failed to fetch WordPress page");
|
||||
}
|
||||
}
|
||||
|
||||
return queued;
|
||||
};
|
||||
|
||||
export const collectArticle = async (payload: DetailsTaskPayload): Promise<unknown> => {
|
||||
const source = resolveSourceConfig(payload.sourceId);
|
||||
const settings = resolveCrawlerConfig(source, {
|
||||
category: payload.category,
|
||||
dateRange: payload.dateRange ? formatDateRange(payload.dateRange) : undefined,
|
||||
pageRange: payload.pageRange ? formatPageRange(payload.pageRange) : undefined,
|
||||
sourceId: payload.sourceId,
|
||||
});
|
||||
const persistors = [
|
||||
new JsonlPersistor({
|
||||
directory: config.paths.data,
|
||||
sourceId: String(source.sourceId),
|
||||
}),
|
||||
];
|
||||
|
||||
if (source.sourceKind === SourceKindSchema.enum.html) {
|
||||
if (!payload.url) throw new Error("Missing article url");
|
||||
const crawler = new HtmlCrawler(settings, { persistors });
|
||||
const html = await crawler.crawl(payload.url);
|
||||
return await crawler.fetchOne(html, settings.dateRange);
|
||||
}
|
||||
|
||||
if (source.sourceKind === SourceKindSchema.enum.wordpress) {
|
||||
const crawler = new WordPressCrawler(settings, { persistors });
|
||||
return await crawler.fetchOne(payload.data ?? {}, settings.dateRange);
|
||||
}
|
||||
|
||||
throw new Error(`Unsupported source kind`);
|
||||
};
|
||||
|
||||
export const forwardForProcessing = async (payload: ProcessingTaskPayload): Promise<Article> => {
|
||||
logger.info({ article: payload.article.title }, "Ready for downstream processing");
|
||||
|
||||
const client = new SyncHttpClient(config.fetch.client);
|
||||
const endpoint = env("BASANGO_CRAWLER_BACKEND_API_ENDPOINT");
|
||||
|
||||
await client.post(endpoint, { json: payload.article });
|
||||
logger.info({ article: payload.article.title }, "Forwarded article to API");
|
||||
|
||||
return payload.article;
|
||||
};
|
||||
@@ -0,0 +1,107 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { JobsOptions, Queue, QueueOptions } from "bullmq";
|
||||
import IORedis from "ioredis";
|
||||
import { config, FetchAsyncConfig } from "@/config";
|
||||
import {
|
||||
DetailsTaskPayload,
|
||||
DetailsTaskPayloadSchema,
|
||||
ListingTaskPayload,
|
||||
ListingTaskPayloadSchema,
|
||||
ProcessingTaskPayload,
|
||||
ProcessingTaskPayloadSchema,
|
||||
} from "@/process/async/schemas";
|
||||
import { parseRedisUrl } from "@/utils";
|
||||
|
||||
export interface QueueBackend<T = unknown> {
|
||||
add: (name: string, data: T, opts?: JobsOptions) => Promise<{ id: string }>;
|
||||
}
|
||||
|
||||
export type QueueFactory = (
|
||||
queueName: string,
|
||||
settings: FetchAsyncConfig,
|
||||
connection?: IORedis,
|
||||
) => QueueBackend;
|
||||
|
||||
const defaultQueueFactory: QueueFactory = (queueName, settings, connection) => {
|
||||
const redisConnection =
|
||||
connection ??
|
||||
new IORedis(settings.redisUrl, {
|
||||
...parseRedisUrl(settings.redisUrl),
|
||||
maxRetriesPerRequest: null,
|
||||
});
|
||||
const options: QueueOptions = {
|
||||
connection: redisConnection,
|
||||
prefix: settings.prefix,
|
||||
};
|
||||
|
||||
const queue = new Queue(queueName, options);
|
||||
return {
|
||||
add: async (name, data, opts) => {
|
||||
const job = await queue.add(name, data, {
|
||||
removeOnComplete: settings.ttl.result === 0 ? true : undefined,
|
||||
removeOnFail: settings.ttl.failure === 0 ? true : undefined,
|
||||
...opts,
|
||||
});
|
||||
return { id: job.id ?? randomUUID() };
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
export interface CreateQueueManagerOptions {
|
||||
queueFactory?: QueueFactory;
|
||||
connection?: IORedis;
|
||||
}
|
||||
|
||||
export interface QueueManager {
|
||||
readonly settings: FetchAsyncConfig;
|
||||
readonly connection: IORedis;
|
||||
enqueueListing: (payload: ListingTaskPayload) => Promise<{ id: string }>;
|
||||
enqueueArticle: (payload: DetailsTaskPayload) => Promise<{ id: string }>;
|
||||
enqueueProcessed: (payload: ProcessingTaskPayload) => Promise<{ id: string }>;
|
||||
iterQueueNames: () => string[];
|
||||
queueName: (suffix: string) => string;
|
||||
close: () => Promise<void>;
|
||||
}
|
||||
|
||||
export const createQueueManager = (options: CreateQueueManagerOptions = {}): QueueManager => {
|
||||
const settings = config.fetch.async;
|
||||
|
||||
const connection =
|
||||
options.connection ??
|
||||
new IORedis(settings.redisUrl, {
|
||||
...parseRedisUrl(settings.redisUrl),
|
||||
maxRetriesPerRequest: null,
|
||||
});
|
||||
const factory = options.queueFactory ?? defaultQueueFactory;
|
||||
|
||||
const ensureQueue = (queueName: string) => factory(queueName, settings, connection);
|
||||
|
||||
return {
|
||||
close: async () => {
|
||||
await connection.quit();
|
||||
},
|
||||
connection,
|
||||
enqueueArticle: (payload) => {
|
||||
const data = DetailsTaskPayloadSchema.parse(payload);
|
||||
const queue = ensureQueue(settings.queues.details);
|
||||
return queue.add("collect_article", data);
|
||||
},
|
||||
enqueueListing: (payload) => {
|
||||
const data = ListingTaskPayloadSchema.parse(payload);
|
||||
const queue = ensureQueue(settings.queues.listing);
|
||||
return queue.add("collect_listing", data);
|
||||
},
|
||||
enqueueProcessed: (payload) => {
|
||||
const data = ProcessingTaskPayloadSchema.parse(payload);
|
||||
const queue = ensureQueue(settings.queues.processing);
|
||||
return queue.add("forward_for_processing", data);
|
||||
},
|
||||
iterQueueNames: () => [
|
||||
`${settings.prefix}:${settings.queues.listing}`,
|
||||
`${settings.prefix}:${settings.queues.details}`,
|
||||
`${settings.prefix}:${settings.queues.processing}`,
|
||||
],
|
||||
queueName: (suffix: string) => `${settings.prefix}:${suffix}`,
|
||||
settings,
|
||||
};
|
||||
};
|
||||
@@ -0,0 +1,28 @@
|
||||
import { z } from "zod";
|
||||
import { ArticleSchema, DateRangeSchema, PageRangeSchema } from "@/schema";
|
||||
|
||||
export const ListingTaskPayloadSchema = z.object({
|
||||
category: z.string().optional(),
|
||||
dateRange: z.string().optional(),
|
||||
pageRange: z.string().optional(),
|
||||
sourceId: z.string(),
|
||||
});
|
||||
|
||||
export const DetailsTaskPayloadSchema = z.object({
|
||||
category: z.string().optional(),
|
||||
data: z.any().optional(),
|
||||
dateRange: DateRangeSchema.optional(),
|
||||
page: z.number().int().nonnegative().optional(),
|
||||
pageRange: PageRangeSchema.optional(),
|
||||
sourceId: z.string(),
|
||||
url: z.url(),
|
||||
});
|
||||
|
||||
export const ProcessingTaskPayloadSchema = z.object({
|
||||
article: ArticleSchema,
|
||||
sourceId: z.string(),
|
||||
});
|
||||
|
||||
export type ListingTaskPayload = z.infer<typeof ListingTaskPayloadSchema>;
|
||||
export type DetailsTaskPayload = z.infer<typeof DetailsTaskPayloadSchema>;
|
||||
export type ProcessingTaskPayload = z.infer<typeof ProcessingTaskPayloadSchema>;
|
||||
@@ -0,0 +1,60 @@
|
||||
import { logger } from "@basango/logger";
|
||||
import * as handlers from "@/process/async/handlers";
|
||||
import { createQueueManager } from "@/process/async/queue";
|
||||
import {
|
||||
DetailsTaskPayloadSchema,
|
||||
ListingTaskPayloadSchema,
|
||||
ProcessingTaskPayloadSchema,
|
||||
} from "@/process/async/schemas";
|
||||
import { CrawlingOptions } from "@/process/crawler";
|
||||
|
||||
export const collectListing = async (payload: unknown): Promise<number> => {
|
||||
const data = ListingTaskPayloadSchema.parse(payload);
|
||||
logger.debug({ data }, "Collecting listing");
|
||||
|
||||
const count = await handlers.collectHtmlListing(data);
|
||||
logger.info({ count }, "Listing collection completed");
|
||||
|
||||
return count;
|
||||
};
|
||||
|
||||
export const collectArticle = async (payload: unknown): Promise<unknown> => {
|
||||
const data = DetailsTaskPayloadSchema.parse(payload);
|
||||
logger.info({ data }, "Collecting article");
|
||||
|
||||
const result = await handlers.collectArticle(data);
|
||||
logger.info({ url: data.url }, "Article collection completed");
|
||||
|
||||
return result;
|
||||
};
|
||||
|
||||
export const forwardForProcessing = async (payload: unknown): Promise<unknown> => {
|
||||
const data = ProcessingTaskPayloadSchema.parse(payload);
|
||||
logger.debug({ sourceId: data.sourceId }, "Forwarding article for processing");
|
||||
|
||||
const result = await handlers.forwardForProcessing(data);
|
||||
logger.info({ result }, "Article forwarded for processing");
|
||||
|
||||
return result;
|
||||
};
|
||||
|
||||
export const scheduleAsyncCrawl = async (options: CrawlingOptions): Promise<string> => {
|
||||
const payload = ListingTaskPayloadSchema.parse({
|
||||
category: options.category,
|
||||
dateRange: options.dateRange,
|
||||
pageRange: options.pageRange,
|
||||
sourceId: options.sourceId,
|
||||
});
|
||||
|
||||
const manager = createQueueManager();
|
||||
logger.info({ payload }, "Scheduling listing collection job");
|
||||
|
||||
try {
|
||||
const job = await manager.enqueueListing(payload);
|
||||
logger.info({ job }, "Scheduled listing collection job");
|
||||
|
||||
return job.id;
|
||||
} finally {
|
||||
await manager.close();
|
||||
}
|
||||
};
|
||||
@@ -0,0 +1,74 @@
|
||||
import { QueueEvents, Worker } from "bullmq";
|
||||
import IORedis from "ioredis";
|
||||
|
||||
import { QueueFactory, QueueManager } from "@/process/async/queue";
|
||||
import { collectArticle, collectListing, forwardForProcessing } from "@/process/async/tasks";
|
||||
|
||||
export interface WorkerOptions {
|
||||
queueNames?: string[];
|
||||
connection?: IORedis;
|
||||
queueFactory?: QueueFactory;
|
||||
concurrency?: number;
|
||||
onError?: (error: Error) => void;
|
||||
queueManager: QueueManager;
|
||||
}
|
||||
|
||||
export interface WorkerHandle {
|
||||
readonly workers: Worker[];
|
||||
readonly events: QueueEvents[];
|
||||
close: () => Promise<void>;
|
||||
}
|
||||
|
||||
export const startWorker = (options: WorkerOptions): WorkerHandle => {
|
||||
const manager = options.queueManager;
|
||||
const queueNames = options.queueNames ?? manager.iterQueueNames();
|
||||
const workers: Worker[] = [];
|
||||
const events: QueueEvents[] = [];
|
||||
|
||||
const connection = manager.connection;
|
||||
|
||||
for (const queueName of queueNames) {
|
||||
const worker = new Worker(
|
||||
queueName,
|
||||
async (job) => {
|
||||
switch (job.name) {
|
||||
case "collect_listing":
|
||||
return collectListing(job.data);
|
||||
case "collect_article":
|
||||
return collectArticle(job.data);
|
||||
case "forward_for_processing":
|
||||
return forwardForProcessing(job.data);
|
||||
default:
|
||||
throw new Error(`Unknown job name: ${job.name}`);
|
||||
}
|
||||
},
|
||||
{
|
||||
concurrency: options.concurrency ?? 5,
|
||||
connection,
|
||||
},
|
||||
);
|
||||
|
||||
if (options.onError) {
|
||||
worker.on("failed", (_, err) => options.onError?.(err as Error));
|
||||
worker.on("error", (err) => options.onError?.(err as Error));
|
||||
}
|
||||
|
||||
const queueEvents = new QueueEvents(queueName, { connection });
|
||||
|
||||
workers.push(worker);
|
||||
events.push(queueEvents);
|
||||
}
|
||||
|
||||
return {
|
||||
close: async () => {
|
||||
await Promise.all(workers.map((worker) => worker.close()));
|
||||
await Promise.all(events.map((event) => event.close()));
|
||||
|
||||
if (!options.queueManager) {
|
||||
await manager.close();
|
||||
}
|
||||
},
|
||||
events,
|
||||
workers,
|
||||
};
|
||||
};
|
||||
Reference in New Issue
Block a user