diff --git a/apps/crawler/package.json b/apps/crawler/package.json new file mode 100644 index 0000000..3bfbb5e --- /dev/null +++ b/apps/crawler/package.json @@ -0,0 +1,19 @@ +{ + "name": "@basango/crawler", + "version": "0.1.0", + "private": true, + "type": "module", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "scripts": { + "build": "tsc -b", + "test": "vitest --run" + }, + "dependencies": { + "bullmq": "^4.17.0", + "date-fns": "^3.6.0", + "ioredis": "^5.3.2", + "tiktoken": "^1.0.14", + "zod": "^4.0.0" + } +} diff --git a/apps/crawler/src/config.test.ts b/apps/crawler/src/config.test.ts new file mode 100644 index 0000000..14a5ea3 --- /dev/null +++ b/apps/crawler/src/config.test.ts @@ -0,0 +1,81 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import { loadConfig } from "./config"; +import { resolveConfigPath } from "./schema"; + +describe("loadConfig", () => { + it("parses json configuration and ensures directories", () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "crawler-config-")); + const paths = { + root: tempDir, + data: path.join(tempDir, "data"), + logs: path.join(tempDir, "logs"), + configs: path.join(tempDir, "configs"), + }; + + const configPath = path.join(tempDir, "pipeline.json"); + fs.writeFileSync( + configPath, + JSON.stringify( + { + paths, + fetch: { + client: { timeout: 10 }, + }, + }, + null, + 2, + ), + ); + + const config = loadConfig({ configPath }); + + expect(config.fetch.client.timeout).toBe(10); + expect(fs.existsSync(paths.data)).toBe(true); + expect(fs.existsSync(paths.logs)).toBe(true); + expect(fs.existsSync(paths.configs)).toBe(true); + }); + + it("merges environment override if available", () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "crawler-config-")); + const paths = { + root: tempDir, + data: path.join(tempDir, "data"), + logs: path.join(tempDir, "logs"), + configs: path.join(tempDir, "configs"), + }; + + const basePath = path.join(tempDir, "pipeline.json"); + fs.writeFileSync( + basePath, + JSON.stringify( + { + paths, + logging: { level: "INFO" }, + }, + null, + 2, + ), + ); + + const overridePath = resolveConfigPath(basePath, "production"); + fs.writeFileSync( + overridePath, + JSON.stringify( + { + logging: { level: "DEBUG" }, + }, + null, + 2, + ), + ); + + const config = loadConfig({ configPath: basePath, env: "production" }); + + expect(config.logging.level).toBe("DEBUG"); + }); +}); diff --git a/apps/crawler/src/config.ts b/apps/crawler/src/config.ts new file mode 100644 index 0000000..70df6df --- /dev/null +++ b/apps/crawler/src/config.ts @@ -0,0 +1,88 @@ +import fs from "node:fs"; +import path from "node:path"; + +import { + PipelineConfig, + PipelineConfigSchema, + mergePipelineConfig, + resolveConfigPath, + resolveProjectPaths, +} from "./schema"; +import { ensureDirectories } from "./utils"; + +export interface LoadConfigOptions { + configPath?: string; + env?: string; +} + +const DEFAULT_CONFIG_FILES = [ + path.join(process.cwd(), "config", "pipeline.json"), + path.join(process.cwd(), "pipeline.json"), +]; + +const readJsonFile = (filePath: string): unknown => { + const contents = fs.readFileSync(filePath, "utf-8"); + return contents.trim() === "" ? {} : JSON.parse(contents); +}; + +const locateConfigFile = (explicit?: string): string => { + if (explicit && fs.existsSync(explicit)) { + return explicit; + } + + for (const candidate of DEFAULT_CONFIG_FILES) { + if (fs.existsSync(candidate)) { + return candidate; + } + } + + return DEFAULT_CONFIG_FILES[0]; +}; + +const readPipelineConfig = (configPath: string): PipelineConfig => { + if (!fs.existsSync(configPath)) { + return PipelineConfigSchema.parse({ + paths: resolveProjectPaths(path.resolve(".")), + }); + } + + const raw = readJsonFile(configPath); + return PipelineConfigSchema.parse(raw); +}; + +const applyEnvironmentOverride = ( + baseConfig: PipelineConfig, + basePath: string, + env?: string, +): PipelineConfig => { + if (!env || env === "development") { + return baseConfig; + } + + const overridePath = resolveConfigPath(basePath, env); + if (!fs.existsSync(overridePath)) { + return baseConfig; + } + + const overrides = PipelineConfigSchema.parse(readJsonFile(overridePath)); + return mergePipelineConfig(baseConfig, overrides); +}; + +export const loadConfig = (options: LoadConfigOptions = {}): PipelineConfig => { + const basePath = locateConfigFile(options.configPath); + const config = applyEnvironmentOverride( + readPipelineConfig(basePath), + basePath, + options.env, + ); + + ensureDirectories(config.paths); + return config; +}; + +export const dumpConfig = (config: PipelineConfig, targetPath?: string): void => { + const destination = targetPath ?? locateConfigFile(); + const normalized = PipelineConfigSchema.parse(config); + fs.mkdirSync(path.dirname(destination), { recursive: true }); + fs.writeFileSync(destination, JSON.stringify(normalized, null, 2)); +}; diff --git a/apps/crawler/src/index.ts b/apps/crawler/src/index.ts new file mode 100644 index 0000000..b79c080 --- /dev/null +++ b/apps/crawler/src/index.ts @@ -0,0 +1,4 @@ +export * from "./config"; +export * from "./schema"; +export * from "./utils"; +export * from "./services/crawler"; diff --git a/apps/crawler/src/schema.test.ts b/apps/crawler/src/schema.test.ts new file mode 100644 index 0000000..3b86eb3 --- /dev/null +++ b/apps/crawler/src/schema.test.ts @@ -0,0 +1,35 @@ +import { describe, expect, it } from "vitest"; + +import { + PipelineConfigSchema, + createDateRange, + formatDateRange, + isTimestampInRange, + PageRangeSpecSchema, + PageRangeSchema, + schemaToJSON, +} from "./schema"; + +describe("schema helpers", () => { + it("creates date range from spec", () => { + const range = createDateRange("2024-01-01:2024-01-31"); + expect(range.start).toBeLessThan(range.end); + expect(formatDateRange(range)).toBe("2024-01-01:2024-01-31"); + }); + + it("checks membership", () => { + const range = createDateRange("2024-01-01:2024-01-02"); + expect(isTimestampInRange(range, range.start)).toBe(true); + expect(isTimestampInRange(range, range.start - 1)).toBe(false); + }); + + it("parses page range spec", () => { + const range = PageRangeSchema.parse(PageRangeSpecSchema.parse("1:10")); + expect(range).toEqual({ start: 1, end: 10 }); + }); + + it("produces json schema", () => { + const json = schemaToJSON(PipelineConfigSchema); + expect(json.type).toBe("object"); + }); +}); diff --git a/apps/crawler/src/schema.ts b/apps/crawler/src/schema.ts new file mode 100644 index 0000000..fdfa579 --- /dev/null +++ b/apps/crawler/src/schema.ts @@ -0,0 +1,297 @@ +import path from "node:path"; + +import { getUnixTime, parse, isMatch, format as formatDate } from "date-fns"; +import { z } from "zod"; + +export const UpdateDirectionSchema = z.enum(["forward", "backward"]); +export type UpdateDirection = z.infer; + +export const SourceKindSchema = z.enum(["wordpress", "html"]); +export type SourceKind = z.infer; + +export const SourceDateSchema = z.object({ + format: z.string().default("yyyy-LL-dd HH:mm"), + pattern: z.string().nullable().optional(), + replacement: z.string().nullable().optional(), +}); +export type SourceDate = z.infer; + +export const SourceSelectorsSchema = z.object({ + articles: z.string().optional().nullable(), + article_title: z.string().optional().nullable(), + article_link: z.string().optional().nullable(), + article_body: z.string().optional().nullable(), + article_date: z.string().optional().nullable(), + article_categories: z.string().optional().nullable(), + pagination: z.string().default("ul.pagination > li a"), +}); +export type SourceSelectors = z.infer; + +const BaseSourceSchema = z.object({ + source_id: z.string(), + source_url: z.string().url(), + source_date: SourceDateSchema.default(SourceDateSchema.parse({})), + source_kind: SourceKindSchema, + categories: z.array(z.string()).default([]), + supports_categories: z.boolean().default(false), + requires_details: z.boolean().default(false), + requires_rate_limit: z.boolean().default(false), +}); + +export const HtmlSourceConfigSchema = BaseSourceSchema.extend({ + source_kind: z.literal("html"), + source_selectors: SourceSelectorsSchema.default(SourceSelectorsSchema.parse({})), + pagination_template: z.string(), +}); + +export const WordPressSourceConfigSchema = BaseSourceSchema.extend({ + source_kind: z.literal("wordpress"), + source_date: SourceDateSchema.default( + SourceDateSchema.parse({ format: "yyyy-LL-dd'T'HH:mm:ss" }), + ), +}); + +export type HtmlSourceConfig = z.infer; +export type WordPressSourceConfig = z.infer; +export type AnySourceConfig = HtmlSourceConfig | WordPressSourceConfig; + +export const DateRangeSchema = z + .object({ + start: z.number().int(), + end: z.number().int(), + }) + .superRefine((value, ctx) => { + if (value.start === 0 || value.end === 0) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "Timestamp cannot be zero", + }); + } + if (value.end < value.start) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "End timestamp must be greater than or equal to start", + }); + } + }); + +export type DateRange = z.infer; + +export const PageRangeSchema = z + .object({ + start: z.number().int().min(0), + end: z.number().int().min(0), + }) + .superRefine((value, ctx) => { + if (value.end < value.start) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "End page must be greater than or equal to start page", + }); + } + }); + +export type PageRange = z.infer; + +export const PageRangeSpecSchema = z + .string() + .regex(/^[0-9]+:[0-9]+$/, "Invalid page range format. Use start:end") + .transform((spec) => { + const [startText, endText] = spec.split(":"); + return { + start: Number.parseInt(startText, 10), + end: Number.parseInt(endText, 10), + }; + }); + +const defaultDateFormat = "yyyy-LL-dd"; + +export const DateRangeSpecSchema = z + .string() + .regex(/.+:.+/, "Expected start:end format") + .transform((spec) => { + const [startRaw, endRaw] = spec.split(":"); + return { startRaw, endRaw }; + }); + +const parseDate = (value: string, format: string): Date => { + if (!isMatch(value, format)) { + throw new Error(`Invalid date '${value}' for format '${format}'`); + } + const parsed = parse(value, format, new Date()); + if (Number.isNaN(parsed.getTime())) { + throw new Error(`Invalid date '${value}' for format '${format}'`); + } + return parsed; +}; + +export interface CreateDateRangeOptions { + format?: string; + separator?: string; +} + +export const createDateRange = ( + spec: string, + options: CreateDateRangeOptions = {}, +): DateRange => { + const { format = defaultDateFormat, separator = ":" } = options; + if (!separator) { + throw new Error("Separator cannot be empty"); + } + + const normalized = spec.replace(separator, ":"); + const parsedSpec = DateRangeSpecSchema.parse(normalized); + + const startDate = parseDate(parsedSpec.startRaw, format); + const endDate = parseDate(parsedSpec.endRaw, format); + + const range = { + start: getUnixTime(startDate), + end: getUnixTime(endDate), + }; + + return DateRangeSchema.parse(range); +}; + +export const formatDateRange = (range: DateRange, fmt = defaultDateFormat): string => { + const start = formatDate(new Date(range.start * 1000), fmt); + const end = formatDate(new Date(range.end * 1000), fmt); + return `${start}:${end}`; +}; + +export const isTimestampInRange = (range: DateRange, timestamp: number): boolean => { + return range.start <= timestamp && timestamp <= range.end; +}; + +export const ProjectPathsSchema = z.object({ + root: z.string(), + data: z.string(), + logs: z.string(), + configs: z.string(), +}); +export type ProjectPaths = z.infer; + +export const resolveProjectPaths = (rootDir: string): ProjectPaths => { + return ProjectPathsSchema.parse({ + root: rootDir, + data: path.join(rootDir, "data", "dataset"), + logs: path.join(rootDir, "data", "logs"), + configs: path.join(rootDir, "config"), + }); +}; + +export const LoggingConfigSchema = z.object({ + level: z.string().default("INFO"), + format: z + .string() + .default("%(asctime)s - %(name)s - %(levelname)s - %(message)s"), + console_logging: z.boolean().default(true), + file_logging: z.boolean().default(false), + log_file: z.string().default("crawler.log"), + max_log_size: z.number().int().positive().default(10 * 1024 * 1024), + backup_count: z.number().int().nonnegative().default(5), +}); +export type LoggingConfig = z.infer; + +export const ClientConfigSchema = z.object({ + timeout: z.number().positive().default(20), + user_agent: z + .string() + .default("Basango/0.1 (+https://github.com/bernard-ng/basango)"), + follow_redirects: z.boolean().default(true), + verify_ssl: z.boolean().default(true), + rotate: z.boolean().default(true), + max_retries: z.number().int().nonnegative().default(3), + backoff_initial: z.number().nonnegative().default(1), + backoff_multiplier: z.number().positive().default(2), + backoff_max: z.number().nonnegative().default(30), + respect_retry_after: z.boolean().default(true), +}); + +export const CrawlerConfigSchema = z.object({ + source: z.union([HtmlSourceConfigSchema, WordPressSourceConfigSchema]).optional(), + page_range: PageRangeSchema.optional(), + date_range: DateRangeSchema.optional(), + category: z.string().optional(), + notify: z.boolean().default(false), + is_update: z.boolean().default(false), + use_multi_threading: z.boolean().default(false), + max_workers: z.number().int().positive().default(5), + direction: UpdateDirectionSchema.default("forward"), +}); + +export type ClientConfig = z.infer; +export type CrawlerConfig = z.infer & { + source?: AnySourceConfig; +}; + +export const FetchConfigSchema = z.object({ + client: ClientConfigSchema.default(ClientConfigSchema.parse({})), + crawler: CrawlerConfigSchema.default(CrawlerConfigSchema.parse({})), +}); +export type FetchConfig = z.infer; + +const SourcesConfigSchema = z.object({ + html: z.array(HtmlSourceConfigSchema).default([]), + wordpress: z.array(WordPressSourceConfigSchema).default([]), +}); + +export type SourcesConfig = z.infer & { + find: (sourceId: string) => AnySourceConfig | undefined; +}; + +export const createSourcesConfig = (input: unknown): SourcesConfig => { + const parsed = SourcesConfigSchema.parse(input); + const resolver = (sourceId: string) => + [...parsed.html, ...parsed.wordpress].find((source) => source.source_id === sourceId); + return Object.assign({ find: resolver }, parsed); +}; + +export const PipelineConfigSchema = z.object({ + paths: ProjectPathsSchema.default(resolveProjectPaths(process.cwd())), + logging: LoggingConfigSchema.default(LoggingConfigSchema.parse({})), + fetch: FetchConfigSchema.default(FetchConfigSchema.parse({})), + sources: z + .union([SourcesConfigSchema, z.undefined()]) + .transform((value) => createSourcesConfig(value ?? {})), +}); + +export type PipelineConfig = z.infer & { + sources: SourcesConfig; +}; + +export const mergePipelineConfig = ( + base: PipelineConfig, + overrides: Partial, +): PipelineConfig => { + const paths = overrides.paths ?? base.paths; + const logging = { ...base.logging, ...(overrides.logging ?? {}) }; + const fetch = { + client: { ...base.fetch.client, ...(overrides.fetch?.client ?? {}) }, + crawler: { ...base.fetch.crawler, ...(overrides.fetch?.crawler ?? {}) }, + }; + + const sources = createSourcesConfig({ + html: overrides.sources?.html ?? base.sources.html, + wordpress: overrides.sources?.wordpress ?? base.sources.wordpress, + }); + + return { + paths, + logging, + fetch, + sources, + }; +}; + +export const resolveConfigPath = (basePath: string, env?: string): string => { + if (!env || env === "development") { + return basePath; + } + + const ext = path.extname(basePath); + const withoutExt = basePath.slice(0, basePath.length - ext.length); + return `${withoutExt}.${env}${ext}`; +}; + +export const schemaToJSON = (schema: T) => schema.toJSON(); diff --git a/apps/crawler/src/services/crawler/async/queue.test.ts b/apps/crawler/src/services/crawler/async/queue.test.ts new file mode 100644 index 0000000..2d971c4 --- /dev/null +++ b/apps/crawler/src/services/crawler/async/queue.test.ts @@ -0,0 +1,58 @@ +import { describe, expect, it } from "vitest"; + +import { createQueueManager, createQueueSettings } from "./queue"; + +class InMemoryQueue { + public jobs: Array<{ name: string; data: unknown }> = []; + + async add(name: string, data: unknown) { + this.jobs.push({ name, data }); + return { id: `${name}-${this.jobs.length}` }; + } +} + +describe("createQueueManager", () => { + it("prefixes queue names", () => { + const manager = createQueueManager({ + settings: createQueueSettings({ prefix: "test" }), + queueFactory: (queueName) => { + expect(queueName).toBe("listing"); + return new InMemoryQueue(); + }, + connection: { + quit: async () => undefined, + } as any, + }); + + expect(manager.iterQueueNames()).toEqual([ + "test:listing", + "test:articles", + "test:processed", + ]); + }); + + it("enqueues listing job with validated payload", async () => { + const queue = new InMemoryQueue(); + const manager = createQueueManager({ + queueFactory: () => queue, + connection: { quit: async () => undefined } as any, + }); + + const job = await manager.enqueueListing({ + source_id: "radiookapi", + env: "test", + }); + + expect(job.id).toBe("collect_listing-1"); + expect(queue.jobs[0]).toEqual({ + name: "collect_listing", + data: { + source_id: "radiookapi", + env: "test", + page_range: undefined, + date_range: undefined, + category: undefined, + }, + }); + }); +}); diff --git a/apps/crawler/src/services/crawler/async/queue.ts b/apps/crawler/src/services/crawler/async/queue.ts new file mode 100644 index 0000000..35ed193 --- /dev/null +++ b/apps/crawler/src/services/crawler/async/queue.ts @@ -0,0 +1,118 @@ +import { randomUUID } from "node:crypto"; + +import IORedis from "ioredis"; +import { Queue, JobsOptions, QueueOptions } from "bullmq"; +import { z } from "zod"; + +import { + ListingTaskPayload, + ArticleTaskPayload, + ProcessedTaskPayload, + ListingTaskPayloadSchema, + ArticleTaskPayloadSchema, + ProcessedTaskPayloadSchema, +} from "./schemas"; +import { parseRedisUrl } from "../../../utils"; + +const QueueSettingsSchema = z.object({ + redis_url: z.string().default(process.env.BASANGO_REDIS_URL ?? "redis://localhost:6379/0"), + prefix: z.string().default(process.env.BASANGO_QUEUE_PREFIX ?? "crawler"), + default_timeout: z.number().int().positive().default(Number(process.env.BASANGO_QUEUE_TIMEOUT ?? 600)), + result_ttl: z.number().int().nonnegative().default(Number(process.env.BASANGO_QUEUE_RESULT_TTL ?? 3600)), + failure_ttl: z.number().int().nonnegative().default(Number(process.env.BASANGO_QUEUE_FAILURE_TTL ?? 3600)), + listing_queue: z.string().default("listing"), + article_queue: z.string().default("articles"), + processed_queue: z.string().default("processed"), +}); + +export type QueueSettingsInput = z.input; +export type QueueSettings = z.output; + +export const createQueueSettings = (input?: QueueSettingsInput): QueueSettings => + QueueSettingsSchema.parse(input ?? {}); + +export interface QueueBackend { + add: (name: string, data: T, opts?: JobsOptions) => Promise<{ id: string }>; +} + +export type QueueFactory = ( + queueName: string, + settings: QueueSettings, + connection?: IORedis, +) => QueueBackend; + +const defaultQueueFactory: QueueFactory = (queueName, settings, connection) => { + const redisConnection = connection ?? new IORedis(settings.redis_url, parseRedisUrl(settings.redis_url)); + const options: QueueOptions = { + connection: redisConnection, + prefix: settings.prefix, + }; + + const queue = new Queue(queueName, options); + return { + add: async (name, data, opts) => { + const job = await queue.add(name, data, { + removeOnComplete: settings.result_ttl === 0 ? true : undefined, + removeOnFail: settings.failure_ttl === 0 ? true : undefined, + timeout: settings.default_timeout * 1000, + ...opts, + }); + return { id: job.id ?? randomUUID() }; + }, + }; +}; + +export interface CreateQueueManagerOptions { + settings?: QueueSettings | QueueSettingsInput; + queueFactory?: QueueFactory; + connection?: IORedis; +} + +export interface QueueManager { + readonly settings: QueueSettings; + readonly connection: IORedis; + enqueueListing: (payload: ListingTaskPayload) => Promise<{ id: string }>; + enqueueArticle: (payload: ArticleTaskPayload) => Promise<{ id: string }>; + enqueueProcessed: (payload: ProcessedTaskPayload) => Promise<{ id: string }>; + iterQueueNames: () => string[]; + queueName: (suffix: string) => string; + close: () => Promise; +} + +export const createQueueManager = (options: CreateQueueManagerOptions = {}): QueueManager => { + const settings = createQueueSettings(options.settings as QueueSettingsInput | undefined); + + const connection = options.connection ?? new IORedis(settings.redis_url, parseRedisUrl(settings.redis_url)); + const factory = options.queueFactory ?? defaultQueueFactory; + + const ensureQueue = (queueName: string) => factory(queueName, settings, connection); + + return { + settings, + connection, + enqueueListing: (payload) => { + const data = ListingTaskPayloadSchema.parse(payload); + const queue = ensureQueue(settings.listing_queue); + return queue.add("collect_listing", data); + }, + enqueueArticle: (payload) => { + const data = ArticleTaskPayloadSchema.parse(payload); + const queue = ensureQueue(settings.article_queue); + return queue.add("collect_article", data); + }, + enqueueProcessed: (payload) => { + const data = ProcessedTaskPayloadSchema.parse(payload); + const queue = ensureQueue(settings.processed_queue); + return queue.add("forward_for_processing", data); + }, + iterQueueNames: () => [ + `${settings.prefix}:${settings.listing_queue}`, + `${settings.prefix}:${settings.article_queue}`, + `${settings.prefix}:${settings.processed_queue}`, + ], + queueName: (suffix: string) => `${settings.prefix}:${suffix}`, + close: async () => { + await connection.quit(); + }, + }; +}; diff --git a/apps/crawler/src/services/crawler/async/schemas.ts b/apps/crawler/src/services/crawler/async/schemas.ts new file mode 100644 index 0000000..bb13743 --- /dev/null +++ b/apps/crawler/src/services/crawler/async/schemas.ts @@ -0,0 +1,37 @@ +import { z } from "zod"; + +import { AnySourceConfig, DateRangeSchema, PageRangeSchema } from "../../../schema"; + +export const ListingTaskPayloadSchema = z.object({ + source_id: z.string(), + env: z.string().default("development"), + page_range: z.string().optional().nullable(), + date_range: z.string().optional().nullable(), + category: z.string().optional().nullable(), +}); + +export type ListingTaskPayload = z.infer; + +export const ArticleTaskPayloadSchema = z.object({ + source_id: z.string(), + env: z.string().default("development"), + url: z.string().url(), + page: z.number().int().nonnegative().optional(), + page_range: PageRangeSchema.optional().nullable(), + date_range: DateRangeSchema.optional().nullable(), + category: z.string().optional().nullable(), +}); + +export type ArticleTaskPayload = z.infer; + +export const ProcessedTaskPayloadSchema = z.object({ + source_id: z.string(), + env: z.string().default("development"), + article: z.any(), +}); + +export type ProcessedTaskPayload = z.infer; + +export interface ListingContext { + source: AnySourceConfig; +} diff --git a/apps/crawler/src/services/crawler/async/tasks.test.ts b/apps/crawler/src/services/crawler/async/tasks.test.ts new file mode 100644 index 0000000..6ae237d --- /dev/null +++ b/apps/crawler/src/services/crawler/async/tasks.test.ts @@ -0,0 +1,46 @@ +import { describe, expect, it, vi } from "vitest"; + +import { scheduleAsyncCrawl, registerCrawlerTaskHandlers, collectListing } from "./tasks"; +import { QueueManager } from "./queue"; + +describe("Async tasks", () => { + it("schedules crawl with provided manager", async () => { + const enqueueListing = vi.fn().mockResolvedValue({ id: "job-1" }); + const manager = { + enqueueListing, + } as unknown as QueueManager; + + const jobId = await scheduleAsyncCrawl({ + sourceId: "radiookapi", + queueManager: manager, + }); + + expect(jobId).toBe("job-1"); + expect(enqueueListing).toHaveBeenCalledWith({ + source_id: "radiookapi", + env: "development", + page_range: undefined, + date_range: undefined, + category: undefined, + }); + }); + + it("delegates listing collection to registered handler", async () => { + const handler = vi.fn().mockResolvedValue(5); + registerCrawlerTaskHandlers({ collectListing: handler }); + + const count = await collectListing({ + source_id: "radiookapi", + env: "development", + }); + + expect(count).toBe(5); + expect(handler).toHaveBeenCalledWith({ + source_id: "radiookapi", + env: "development", + page_range: undefined, + date_range: undefined, + category: undefined, + }); + }); +}); diff --git a/apps/crawler/src/services/crawler/async/tasks.ts b/apps/crawler/src/services/crawler/async/tasks.ts new file mode 100644 index 0000000..d81f571 --- /dev/null +++ b/apps/crawler/src/services/crawler/async/tasks.ts @@ -0,0 +1,88 @@ +import { + ListingTaskPayloadSchema, + ArticleTaskPayloadSchema, + ProcessedTaskPayloadSchema, + ListingTaskPayload, + ArticleTaskPayload, + ProcessedTaskPayload, +} from "./schemas"; +import { + createQueueManager, + QueueManager, + QueueSettings, + QueueSettingsInput, +} from "./queue"; + +export interface CrawlerTaskHandlers { + collectListing: (payload: ListingTaskPayload) => Promise | number; + collectArticle: (payload: ArticleTaskPayload) => Promise | unknown; + forwardForProcessing: (payload: ProcessedTaskPayload) => Promise | unknown; +} + +const notImplemented = (name: keyof CrawlerTaskHandlers) => () => { + throw new Error(`Crawler task handler '${name}' is not implemented`); +}; + +let handlers: CrawlerTaskHandlers = { + collectListing: notImplemented("collectListing"), + collectArticle: notImplemented("collectArticle"), + forwardForProcessing: notImplemented("forwardForProcessing"), +}; + +export const registerCrawlerTaskHandlers = (overrides: Partial): void => { + handlers = { ...handlers, ...overrides }; +}; + +export interface ScheduleAsyncCrawlOptions { + sourceId: string; + env?: string; + pageRange?: string | null; + dateRange?: string | null; + category?: string | null; + settings?: QueueSettings | QueueSettingsInput; + queueManager?: QueueManager; +} + +export const scheduleAsyncCrawl = async ({ + sourceId, + env = "development", + pageRange, + dateRange, + category, + settings, + queueManager, +}: ScheduleAsyncCrawlOptions): Promise => { + const payload = ListingTaskPayloadSchema.parse({ + source_id: sourceId, + env, + page_range: pageRange ?? undefined, + date_range: dateRange ?? undefined, + category: category ?? undefined, + }); + + const manager = queueManager ?? createQueueManager({ settings }); + try { + const job = await manager.enqueueListing(payload); + return job.id; + } finally { + if (!queueManager) { + await manager.close(); + } + } +}; + +export const collectListing = async (payload: unknown): Promise => { + const data = ListingTaskPayloadSchema.parse(payload); + const result = await handlers.collectListing(data); + return typeof result === "number" ? result : 0; +}; + +export const collectArticle = async (payload: unknown): Promise => { + const data = ArticleTaskPayloadSchema.parse(payload); + return handlers.collectArticle(data); +}; + +export const forwardForProcessing = async (payload: unknown): Promise => { + const data = ProcessedTaskPayloadSchema.parse(payload); + return handlers.forwardForProcessing(data); +}; diff --git a/apps/crawler/src/services/crawler/async/worker.ts b/apps/crawler/src/services/crawler/async/worker.ts new file mode 100644 index 0000000..adbc506 --- /dev/null +++ b/apps/crawler/src/services/crawler/async/worker.ts @@ -0,0 +1,83 @@ +import IORedis from "ioredis"; +import { Worker, QueueEvents } from "bullmq"; + +import { + createQueueManager, + QueueFactory, + QueueManager, + QueueSettings, + QueueSettingsInput, +} from "./queue"; +import { collectArticle, collectListing, forwardForProcessing } from "./tasks"; + +export interface WorkerOptions { + queueNames?: string[]; + settings?: QueueSettings | QueueSettingsInput; + connection?: IORedis; + queueFactory?: QueueFactory; + concurrency?: number; + onError?: (error: Error) => void; + queueManager?: QueueManager; +} + +export interface WorkerHandle { + readonly workers: Worker[]; + readonly events: QueueEvents[]; + close: () => Promise; +} + +export const startWorker = (options: WorkerOptions = {}): WorkerHandle => { + const manager = + options.queueManager ?? + createQueueManager({ + settings: options.settings, + connection: options.connection, + queueFactory: options.queueFactory, + }); + + const queueNames = options.queueNames ?? manager.iterQueueNames(); + const workers: Worker[] = []; + const events: QueueEvents[] = []; + + const connection = manager.connection; + + for (const queueName of queueNames) { + const worker = new Worker(queueName, async (job) => { + switch (job.name) { + case "collect_listing": + return collectListing(job.data); + case "collect_article": + return collectArticle(job.data); + case "forward_for_processing": + return forwardForProcessing(job.data); + default: + throw new Error(`Unknown job name: ${job.name}`); + } + }, { + connection, + concurrency: options.concurrency ?? 5, + }); + + if (options.onError) { + worker.on("failed", (_, err) => options.onError?.(err as Error)); + worker.on("error", (err) => options.onError?.(err as Error)); + } + + const queueEvents = new QueueEvents(queueName, { connection }); + + workers.push(worker); + events.push(queueEvents); + } + + return { + workers, + events, + close: async () => { + await Promise.all(workers.map((worker) => worker.close())); + await Promise.all(events.map((event) => event.close())); + if (!options.queueManager) { + await manager.close(); + } + }, + }; +}; diff --git a/apps/crawler/src/services/crawler/index.ts b/apps/crawler/src/services/crawler/index.ts new file mode 100644 index 0000000..aa828f3 --- /dev/null +++ b/apps/crawler/src/services/crawler/index.ts @@ -0,0 +1,3 @@ +export * from "./async/queue"; +export * from "./async/tasks"; +export * from "./async/worker"; diff --git a/apps/crawler/src/utils.ts b/apps/crawler/src/utils.ts new file mode 100644 index 0000000..ada900a --- /dev/null +++ b/apps/crawler/src/utils.ts @@ -0,0 +1,38 @@ +import fs from "node:fs"; + +import type { RedisOptions } from "ioredis"; +import { get_encoding } from "tiktoken"; + +import type { ProjectPaths } from "./schema"; + +export const ensureDirectories = (paths: ProjectPaths): void => { + for (const dir of [paths.data, paths.logs, paths.configs]) { + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }); + } + } +}; + +export const parseRedisUrl = (url: string): RedisOptions => { + if (!url.startsWith("redis://")) { + return {}; + } + const parsed = new URL(url); + return { + host: parsed.hostname, + port: Number(parsed.port || 6379), + password: parsed.password || undefined, + db: Number(parsed.pathname?.replace("/", "") || 0), + }; +}; + +export const countTokens = (text: string, encoding = "cl100k_base"): number => { + try { + const encoder = get_encoding(encoding); + const tokens = encoder.encode(text); + encoder.free(); + return tokens.length; + } catch { + return text.length; + } +}; diff --git a/apps/crawler/tsconfig.json b/apps/crawler/tsconfig.json new file mode 100644 index 0000000..d3399c2 --- /dev/null +++ b/apps/crawler/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "rootDir": "src", + "outDir": "dist" + }, + "include": ["src"], + "references": [] +} diff --git a/apps/crawler/vitest.config.ts b/apps/crawler/vitest.config.ts new file mode 100644 index 0000000..869d8d7 --- /dev/null +++ b/apps/crawler/vitest.config.ts @@ -0,0 +1,9 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + environment: "node", + globals: true, + include: ["src/**/*.test.ts"], + }, +}); diff --git a/package.json b/package.json new file mode 100644 index 0000000..0589495 --- /dev/null +++ b/package.json @@ -0,0 +1,17 @@ +{ + "name": "basango-monorepo", + "private": true, + "version": "0.1.0", + "workspaces": [ + "apps/*" + ], + "scripts": { + "build": "tsc -b", + "test": "vitest --run" + }, + "devDependencies": { + "typescript": "^5.4.0", + "vitest": "^1.6.0", + "@types/node": "^20.11.30" + } +} diff --git a/tsconfig.base.json b/tsconfig.base.json new file mode 100644 index 0000000..b570f65 --- /dev/null +++ b/tsconfig.base.json @@ -0,0 +1,15 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "ESNext", + "moduleResolution": "Node", + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "strict": true, + "skipLibCheck": true, + "types": ["node"], + "resolveJsonModule": true, + "declaration": true, + "outDir": "dist" + } +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..a5c2ecc --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,7 @@ +{ + "extends": "./tsconfig.base.json", + "files": [], + "references": [ + { "path": "./apps/crawler" } + ] +}