Merge pull request #3 from bernard-ng/codex/port-python-crawler-to-typescript

refactor: centralize crawler schemas and json config
This commit is contained in:
Bernard Ngandu
2025-10-28 06:44:45 +02:00
committed by GitHub
19 changed files with 1052 additions and 0 deletions
+19
View File
@@ -0,0 +1,19 @@
{
"name": "@basango/crawler",
"version": "0.1.0",
"private": true,
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc -b",
"test": "vitest --run"
},
"dependencies": {
"bullmq": "^4.17.0",
"date-fns": "^3.6.0",
"ioredis": "^5.3.2",
"tiktoken": "^1.0.14",
"zod": "^4.0.0"
}
}
+81
View File
@@ -0,0 +1,81 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { loadConfig } from "./config";
import { resolveConfigPath } from "./schema";
describe("loadConfig", () => {
it("parses json configuration and ensures directories", () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "crawler-config-"));
const paths = {
root: tempDir,
data: path.join(tempDir, "data"),
logs: path.join(tempDir, "logs"),
configs: path.join(tempDir, "configs"),
};
const configPath = path.join(tempDir, "pipeline.json");
fs.writeFileSync(
configPath,
JSON.stringify(
{
paths,
fetch: {
client: { timeout: 10 },
},
},
null,
2,
),
);
const config = loadConfig({ configPath });
expect(config.fetch.client.timeout).toBe(10);
expect(fs.existsSync(paths.data)).toBe(true);
expect(fs.existsSync(paths.logs)).toBe(true);
expect(fs.existsSync(paths.configs)).toBe(true);
});
it("merges environment override if available", () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "crawler-config-"));
const paths = {
root: tempDir,
data: path.join(tempDir, "data"),
logs: path.join(tempDir, "logs"),
configs: path.join(tempDir, "configs"),
};
const basePath = path.join(tempDir, "pipeline.json");
fs.writeFileSync(
basePath,
JSON.stringify(
{
paths,
logging: { level: "INFO" },
},
null,
2,
),
);
const overridePath = resolveConfigPath(basePath, "production");
fs.writeFileSync(
overridePath,
JSON.stringify(
{
logging: { level: "DEBUG" },
},
null,
2,
),
);
const config = loadConfig({ configPath: basePath, env: "production" });
expect(config.logging.level).toBe("DEBUG");
});
});
+88
View File
@@ -0,0 +1,88 @@
import fs from "node:fs";
import path from "node:path";
import {
PipelineConfig,
PipelineConfigSchema,
mergePipelineConfig,
resolveConfigPath,
resolveProjectPaths,
} from "./schema";
import { ensureDirectories } from "./utils";
export interface LoadConfigOptions {
configPath?: string;
env?: string;
}
const DEFAULT_CONFIG_FILES = [
path.join(process.cwd(), "config", "pipeline.json"),
path.join(process.cwd(), "pipeline.json"),
];
const readJsonFile = (filePath: string): unknown => {
const contents = fs.readFileSync(filePath, "utf-8");
return contents.trim() === "" ? {} : JSON.parse(contents);
};
const locateConfigFile = (explicit?: string): string => {
if (explicit && fs.existsSync(explicit)) {
return explicit;
}
for (const candidate of DEFAULT_CONFIG_FILES) {
if (fs.existsSync(candidate)) {
return candidate;
}
}
return DEFAULT_CONFIG_FILES[0];
};
const readPipelineConfig = (configPath: string): PipelineConfig => {
if (!fs.existsSync(configPath)) {
return PipelineConfigSchema.parse({
paths: resolveProjectPaths(path.resolve(".")),
});
}
const raw = readJsonFile(configPath);
return PipelineConfigSchema.parse(raw);
};
const applyEnvironmentOverride = (
baseConfig: PipelineConfig,
basePath: string,
env?: string,
): PipelineConfig => {
if (!env || env === "development") {
return baseConfig;
}
const overridePath = resolveConfigPath(basePath, env);
if (!fs.existsSync(overridePath)) {
return baseConfig;
}
const overrides = PipelineConfigSchema.parse(readJsonFile(overridePath));
return mergePipelineConfig(baseConfig, overrides);
};
export const loadConfig = (options: LoadConfigOptions = {}): PipelineConfig => {
const basePath = locateConfigFile(options.configPath);
const config = applyEnvironmentOverride(
readPipelineConfig(basePath),
basePath,
options.env,
);
ensureDirectories(config.paths);
return config;
};
export const dumpConfig = (config: PipelineConfig, targetPath?: string): void => {
const destination = targetPath ?? locateConfigFile();
const normalized = PipelineConfigSchema.parse(config);
fs.mkdirSync(path.dirname(destination), { recursive: true });
fs.writeFileSync(destination, JSON.stringify(normalized, null, 2));
};
+4
View File
@@ -0,0 +1,4 @@
export * from "./config";
export * from "./schema";
export * from "./utils";
export * from "./services/crawler";
+35
View File
@@ -0,0 +1,35 @@
import { describe, expect, it } from "vitest";
import {
PipelineConfigSchema,
createDateRange,
formatDateRange,
isTimestampInRange,
PageRangeSpecSchema,
PageRangeSchema,
schemaToJSON,
} from "./schema";
describe("schema helpers", () => {
it("creates date range from spec", () => {
const range = createDateRange("2024-01-01:2024-01-31");
expect(range.start).toBeLessThan(range.end);
expect(formatDateRange(range)).toBe("2024-01-01:2024-01-31");
});
it("checks membership", () => {
const range = createDateRange("2024-01-01:2024-01-02");
expect(isTimestampInRange(range, range.start)).toBe(true);
expect(isTimestampInRange(range, range.start - 1)).toBe(false);
});
it("parses page range spec", () => {
const range = PageRangeSchema.parse(PageRangeSpecSchema.parse("1:10"));
expect(range).toEqual({ start: 1, end: 10 });
});
it("produces json schema", () => {
const json = schemaToJSON(PipelineConfigSchema);
expect(json.type).toBe("object");
});
});
+297
View File
@@ -0,0 +1,297 @@
import path from "node:path";
import { getUnixTime, parse, isMatch, format as formatDate } from "date-fns";
import { z } from "zod";
export const UpdateDirectionSchema = z.enum(["forward", "backward"]);
export type UpdateDirection = z.infer<typeof UpdateDirectionSchema>;
export const SourceKindSchema = z.enum(["wordpress", "html"]);
export type SourceKind = z.infer<typeof SourceKindSchema>;
export const SourceDateSchema = z.object({
format: z.string().default("yyyy-LL-dd HH:mm"),
pattern: z.string().nullable().optional(),
replacement: z.string().nullable().optional(),
});
export type SourceDate = z.infer<typeof SourceDateSchema>;
export const SourceSelectorsSchema = z.object({
articles: z.string().optional().nullable(),
article_title: z.string().optional().nullable(),
article_link: z.string().optional().nullable(),
article_body: z.string().optional().nullable(),
article_date: z.string().optional().nullable(),
article_categories: z.string().optional().nullable(),
pagination: z.string().default("ul.pagination > li a"),
});
export type SourceSelectors = z.infer<typeof SourceSelectorsSchema>;
const BaseSourceSchema = z.object({
source_id: z.string(),
source_url: z.string().url(),
source_date: SourceDateSchema.default(SourceDateSchema.parse({})),
source_kind: SourceKindSchema,
categories: z.array(z.string()).default([]),
supports_categories: z.boolean().default(false),
requires_details: z.boolean().default(false),
requires_rate_limit: z.boolean().default(false),
});
export const HtmlSourceConfigSchema = BaseSourceSchema.extend({
source_kind: z.literal("html"),
source_selectors: SourceSelectorsSchema.default(SourceSelectorsSchema.parse({})),
pagination_template: z.string(),
});
export const WordPressSourceConfigSchema = BaseSourceSchema.extend({
source_kind: z.literal("wordpress"),
source_date: SourceDateSchema.default(
SourceDateSchema.parse({ format: "yyyy-LL-dd'T'HH:mm:ss" }),
),
});
export type HtmlSourceConfig = z.infer<typeof HtmlSourceConfigSchema>;
export type WordPressSourceConfig = z.infer<typeof WordPressSourceConfigSchema>;
export type AnySourceConfig = HtmlSourceConfig | WordPressSourceConfig;
export const DateRangeSchema = z
.object({
start: z.number().int(),
end: z.number().int(),
})
.superRefine((value, ctx) => {
if (value.start === 0 || value.end === 0) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "Timestamp cannot be zero",
});
}
if (value.end < value.start) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "End timestamp must be greater than or equal to start",
});
}
});
export type DateRange = z.infer<typeof DateRangeSchema>;
export const PageRangeSchema = z
.object({
start: z.number().int().min(0),
end: z.number().int().min(0),
})
.superRefine((value, ctx) => {
if (value.end < value.start) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "End page must be greater than or equal to start page",
});
}
});
export type PageRange = z.infer<typeof PageRangeSchema>;
export const PageRangeSpecSchema = z
.string()
.regex(/^[0-9]+:[0-9]+$/, "Invalid page range format. Use start:end")
.transform((spec) => {
const [startText, endText] = spec.split(":");
return {
start: Number.parseInt(startText, 10),
end: Number.parseInt(endText, 10),
};
});
const defaultDateFormat = "yyyy-LL-dd";
export const DateRangeSpecSchema = z
.string()
.regex(/.+:.+/, "Expected start:end format")
.transform((spec) => {
const [startRaw, endRaw] = spec.split(":");
return { startRaw, endRaw };
});
const parseDate = (value: string, format: string): Date => {
if (!isMatch(value, format)) {
throw new Error(`Invalid date '${value}' for format '${format}'`);
}
const parsed = parse(value, format, new Date());
if (Number.isNaN(parsed.getTime())) {
throw new Error(`Invalid date '${value}' for format '${format}'`);
}
return parsed;
};
export interface CreateDateRangeOptions {
format?: string;
separator?: string;
}
export const createDateRange = (
spec: string,
options: CreateDateRangeOptions = {},
): DateRange => {
const { format = defaultDateFormat, separator = ":" } = options;
if (!separator) {
throw new Error("Separator cannot be empty");
}
const normalized = spec.replace(separator, ":");
const parsedSpec = DateRangeSpecSchema.parse(normalized);
const startDate = parseDate(parsedSpec.startRaw, format);
const endDate = parseDate(parsedSpec.endRaw, format);
const range = {
start: getUnixTime(startDate),
end: getUnixTime(endDate),
};
return DateRangeSchema.parse(range);
};
export const formatDateRange = (range: DateRange, fmt = defaultDateFormat): string => {
const start = formatDate(new Date(range.start * 1000), fmt);
const end = formatDate(new Date(range.end * 1000), fmt);
return `${start}:${end}`;
};
export const isTimestampInRange = (range: DateRange, timestamp: number): boolean => {
return range.start <= timestamp && timestamp <= range.end;
};
export const ProjectPathsSchema = z.object({
root: z.string(),
data: z.string(),
logs: z.string(),
configs: z.string(),
});
export type ProjectPaths = z.infer<typeof ProjectPathsSchema>;
export const resolveProjectPaths = (rootDir: string): ProjectPaths => {
return ProjectPathsSchema.parse({
root: rootDir,
data: path.join(rootDir, "data", "dataset"),
logs: path.join(rootDir, "data", "logs"),
configs: path.join(rootDir, "config"),
});
};
export const LoggingConfigSchema = z.object({
level: z.string().default("INFO"),
format: z
.string()
.default("%(asctime)s - %(name)s - %(levelname)s - %(message)s"),
console_logging: z.boolean().default(true),
file_logging: z.boolean().default(false),
log_file: z.string().default("crawler.log"),
max_log_size: z.number().int().positive().default(10 * 1024 * 1024),
backup_count: z.number().int().nonnegative().default(5),
});
export type LoggingConfig = z.infer<typeof LoggingConfigSchema>;
export const ClientConfigSchema = z.object({
timeout: z.number().positive().default(20),
user_agent: z
.string()
.default("Basango/0.1 (+https://github.com/bernard-ng/basango)"),
follow_redirects: z.boolean().default(true),
verify_ssl: z.boolean().default(true),
rotate: z.boolean().default(true),
max_retries: z.number().int().nonnegative().default(3),
backoff_initial: z.number().nonnegative().default(1),
backoff_multiplier: z.number().positive().default(2),
backoff_max: z.number().nonnegative().default(30),
respect_retry_after: z.boolean().default(true),
});
export const CrawlerConfigSchema = z.object({
source: z.union([HtmlSourceConfigSchema, WordPressSourceConfigSchema]).optional(),
page_range: PageRangeSchema.optional(),
date_range: DateRangeSchema.optional(),
category: z.string().optional(),
notify: z.boolean().default(false),
is_update: z.boolean().default(false),
use_multi_threading: z.boolean().default(false),
max_workers: z.number().int().positive().default(5),
direction: UpdateDirectionSchema.default("forward"),
});
export type ClientConfig = z.infer<typeof ClientConfigSchema>;
export type CrawlerConfig = z.infer<typeof CrawlerConfigSchema> & {
source?: AnySourceConfig;
};
export const FetchConfigSchema = z.object({
client: ClientConfigSchema.default(ClientConfigSchema.parse({})),
crawler: CrawlerConfigSchema.default(CrawlerConfigSchema.parse({})),
});
export type FetchConfig = z.infer<typeof FetchConfigSchema>;
const SourcesConfigSchema = z.object({
html: z.array(HtmlSourceConfigSchema).default([]),
wordpress: z.array(WordPressSourceConfigSchema).default([]),
});
export type SourcesConfig = z.infer<typeof SourcesConfigSchema> & {
find: (sourceId: string) => AnySourceConfig | undefined;
};
export const createSourcesConfig = (input: unknown): SourcesConfig => {
const parsed = SourcesConfigSchema.parse(input);
const resolver = (sourceId: string) =>
[...parsed.html, ...parsed.wordpress].find((source) => source.source_id === sourceId);
return Object.assign({ find: resolver }, parsed);
};
export const PipelineConfigSchema = z.object({
paths: ProjectPathsSchema.default(resolveProjectPaths(process.cwd())),
logging: LoggingConfigSchema.default(LoggingConfigSchema.parse({})),
fetch: FetchConfigSchema.default(FetchConfigSchema.parse({})),
sources: z
.union([SourcesConfigSchema, z.undefined()])
.transform((value) => createSourcesConfig(value ?? {})),
});
export type PipelineConfig = z.infer<typeof PipelineConfigSchema> & {
sources: SourcesConfig;
};
export const mergePipelineConfig = (
base: PipelineConfig,
overrides: Partial<PipelineConfig>,
): PipelineConfig => {
const paths = overrides.paths ?? base.paths;
const logging = { ...base.logging, ...(overrides.logging ?? {}) };
const fetch = {
client: { ...base.fetch.client, ...(overrides.fetch?.client ?? {}) },
crawler: { ...base.fetch.crawler, ...(overrides.fetch?.crawler ?? {}) },
};
const sources = createSourcesConfig({
html: overrides.sources?.html ?? base.sources.html,
wordpress: overrides.sources?.wordpress ?? base.sources.wordpress,
});
return {
paths,
logging,
fetch,
sources,
};
};
export const resolveConfigPath = (basePath: string, env?: string): string => {
if (!env || env === "development") {
return basePath;
}
const ext = path.extname(basePath);
const withoutExt = basePath.slice(0, basePath.length - ext.length);
return `${withoutExt}.${env}${ext}`;
};
export const schemaToJSON = <T extends z.ZodTypeAny>(schema: T) => schema.toJSON();
@@ -0,0 +1,58 @@
import { describe, expect, it } from "vitest";
import { createQueueManager, createQueueSettings } from "./queue";
class InMemoryQueue {
public jobs: Array<{ name: string; data: unknown }> = [];
async add(name: string, data: unknown) {
this.jobs.push({ name, data });
return { id: `${name}-${this.jobs.length}` };
}
}
describe("createQueueManager", () => {
it("prefixes queue names", () => {
const manager = createQueueManager({
settings: createQueueSettings({ prefix: "test" }),
queueFactory: (queueName) => {
expect(queueName).toBe("listing");
return new InMemoryQueue();
},
connection: {
quit: async () => undefined,
} as any,
});
expect(manager.iterQueueNames()).toEqual([
"test:listing",
"test:articles",
"test:processed",
]);
});
it("enqueues listing job with validated payload", async () => {
const queue = new InMemoryQueue();
const manager = createQueueManager({
queueFactory: () => queue,
connection: { quit: async () => undefined } as any,
});
const job = await manager.enqueueListing({
source_id: "radiookapi",
env: "test",
});
expect(job.id).toBe("collect_listing-1");
expect(queue.jobs[0]).toEqual({
name: "collect_listing",
data: {
source_id: "radiookapi",
env: "test",
page_range: undefined,
date_range: undefined,
category: undefined,
},
});
});
});
@@ -0,0 +1,118 @@
import { randomUUID } from "node:crypto";
import IORedis from "ioredis";
import { Queue, JobsOptions, QueueOptions } from "bullmq";
import { z } from "zod";
import {
ListingTaskPayload,
ArticleTaskPayload,
ProcessedTaskPayload,
ListingTaskPayloadSchema,
ArticleTaskPayloadSchema,
ProcessedTaskPayloadSchema,
} from "./schemas";
import { parseRedisUrl } from "../../../utils";
const QueueSettingsSchema = z.object({
redis_url: z.string().default(process.env.BASANGO_REDIS_URL ?? "redis://localhost:6379/0"),
prefix: z.string().default(process.env.BASANGO_QUEUE_PREFIX ?? "crawler"),
default_timeout: z.number().int().positive().default(Number(process.env.BASANGO_QUEUE_TIMEOUT ?? 600)),
result_ttl: z.number().int().nonnegative().default(Number(process.env.BASANGO_QUEUE_RESULT_TTL ?? 3600)),
failure_ttl: z.number().int().nonnegative().default(Number(process.env.BASANGO_QUEUE_FAILURE_TTL ?? 3600)),
listing_queue: z.string().default("listing"),
article_queue: z.string().default("articles"),
processed_queue: z.string().default("processed"),
});
export type QueueSettingsInput = z.input<typeof QueueSettingsSchema>;
export type QueueSettings = z.output<typeof QueueSettingsSchema>;
export const createQueueSettings = (input?: QueueSettingsInput): QueueSettings =>
QueueSettingsSchema.parse(input ?? {});
export interface QueueBackend<T = unknown> {
add: (name: string, data: T, opts?: JobsOptions) => Promise<{ id: string }>;
}
export type QueueFactory = (
queueName: string,
settings: QueueSettings,
connection?: IORedis,
) => QueueBackend;
const defaultQueueFactory: QueueFactory = (queueName, settings, connection) => {
const redisConnection = connection ?? new IORedis(settings.redis_url, parseRedisUrl(settings.redis_url));
const options: QueueOptions = {
connection: redisConnection,
prefix: settings.prefix,
};
const queue = new Queue(queueName, options);
return {
add: async (name, data, opts) => {
const job = await queue.add(name, data, {
removeOnComplete: settings.result_ttl === 0 ? true : undefined,
removeOnFail: settings.failure_ttl === 0 ? true : undefined,
timeout: settings.default_timeout * 1000,
...opts,
});
return { id: job.id ?? randomUUID() };
},
};
};
export interface CreateQueueManagerOptions {
settings?: QueueSettings | QueueSettingsInput;
queueFactory?: QueueFactory;
connection?: IORedis;
}
export interface QueueManager {
readonly settings: QueueSettings;
readonly connection: IORedis;
enqueueListing: (payload: ListingTaskPayload) => Promise<{ id: string }>;
enqueueArticle: (payload: ArticleTaskPayload) => Promise<{ id: string }>;
enqueueProcessed: (payload: ProcessedTaskPayload) => Promise<{ id: string }>;
iterQueueNames: () => string[];
queueName: (suffix: string) => string;
close: () => Promise<void>;
}
export const createQueueManager = (options: CreateQueueManagerOptions = {}): QueueManager => {
const settings = createQueueSettings(options.settings as QueueSettingsInput | undefined);
const connection = options.connection ?? new IORedis(settings.redis_url, parseRedisUrl(settings.redis_url));
const factory = options.queueFactory ?? defaultQueueFactory;
const ensureQueue = (queueName: string) => factory(queueName, settings, connection);
return {
settings,
connection,
enqueueListing: (payload) => {
const data = ListingTaskPayloadSchema.parse(payload);
const queue = ensureQueue(settings.listing_queue);
return queue.add("collect_listing", data);
},
enqueueArticle: (payload) => {
const data = ArticleTaskPayloadSchema.parse(payload);
const queue = ensureQueue(settings.article_queue);
return queue.add("collect_article", data);
},
enqueueProcessed: (payload) => {
const data = ProcessedTaskPayloadSchema.parse(payload);
const queue = ensureQueue(settings.processed_queue);
return queue.add("forward_for_processing", data);
},
iterQueueNames: () => [
`${settings.prefix}:${settings.listing_queue}`,
`${settings.prefix}:${settings.article_queue}`,
`${settings.prefix}:${settings.processed_queue}`,
],
queueName: (suffix: string) => `${settings.prefix}:${suffix}`,
close: async () => {
await connection.quit();
},
};
};
@@ -0,0 +1,37 @@
import { z } from "zod";
import { AnySourceConfig, DateRangeSchema, PageRangeSchema } from "../../../schema";
export const ListingTaskPayloadSchema = z.object({
source_id: z.string(),
env: z.string().default("development"),
page_range: z.string().optional().nullable(),
date_range: z.string().optional().nullable(),
category: z.string().optional().nullable(),
});
export type ListingTaskPayload = z.infer<typeof ListingTaskPayloadSchema>;
export const ArticleTaskPayloadSchema = z.object({
source_id: z.string(),
env: z.string().default("development"),
url: z.string().url(),
page: z.number().int().nonnegative().optional(),
page_range: PageRangeSchema.optional().nullable(),
date_range: DateRangeSchema.optional().nullable(),
category: z.string().optional().nullable(),
});
export type ArticleTaskPayload = z.infer<typeof ArticleTaskPayloadSchema>;
export const ProcessedTaskPayloadSchema = z.object({
source_id: z.string(),
env: z.string().default("development"),
article: z.any(),
});
export type ProcessedTaskPayload = z.infer<typeof ProcessedTaskPayloadSchema>;
export interface ListingContext {
source: AnySourceConfig;
}
@@ -0,0 +1,46 @@
import { describe, expect, it, vi } from "vitest";
import { scheduleAsyncCrawl, registerCrawlerTaskHandlers, collectListing } from "./tasks";
import { QueueManager } from "./queue";
describe("Async tasks", () => {
it("schedules crawl with provided manager", async () => {
const enqueueListing = vi.fn().mockResolvedValue({ id: "job-1" });
const manager = {
enqueueListing,
} as unknown as QueueManager;
const jobId = await scheduleAsyncCrawl({
sourceId: "radiookapi",
queueManager: manager,
});
expect(jobId).toBe("job-1");
expect(enqueueListing).toHaveBeenCalledWith({
source_id: "radiookapi",
env: "development",
page_range: undefined,
date_range: undefined,
category: undefined,
});
});
it("delegates listing collection to registered handler", async () => {
const handler = vi.fn().mockResolvedValue(5);
registerCrawlerTaskHandlers({ collectListing: handler });
const count = await collectListing({
source_id: "radiookapi",
env: "development",
});
expect(count).toBe(5);
expect(handler).toHaveBeenCalledWith({
source_id: "radiookapi",
env: "development",
page_range: undefined,
date_range: undefined,
category: undefined,
});
});
});
@@ -0,0 +1,88 @@
import {
ListingTaskPayloadSchema,
ArticleTaskPayloadSchema,
ProcessedTaskPayloadSchema,
ListingTaskPayload,
ArticleTaskPayload,
ProcessedTaskPayload,
} from "./schemas";
import {
createQueueManager,
QueueManager,
QueueSettings,
QueueSettingsInput,
} from "./queue";
export interface CrawlerTaskHandlers {
collectListing: (payload: ListingTaskPayload) => Promise<number> | number;
collectArticle: (payload: ArticleTaskPayload) => Promise<unknown> | unknown;
forwardForProcessing: (payload: ProcessedTaskPayload) => Promise<unknown> | unknown;
}
const notImplemented = (name: keyof CrawlerTaskHandlers) => () => {
throw new Error(`Crawler task handler '${name}' is not implemented`);
};
let handlers: CrawlerTaskHandlers = {
collectListing: notImplemented("collectListing"),
collectArticle: notImplemented("collectArticle"),
forwardForProcessing: notImplemented("forwardForProcessing"),
};
export const registerCrawlerTaskHandlers = (overrides: Partial<CrawlerTaskHandlers>): void => {
handlers = { ...handlers, ...overrides };
};
export interface ScheduleAsyncCrawlOptions {
sourceId: string;
env?: string;
pageRange?: string | null;
dateRange?: string | null;
category?: string | null;
settings?: QueueSettings | QueueSettingsInput;
queueManager?: QueueManager;
}
export const scheduleAsyncCrawl = async ({
sourceId,
env = "development",
pageRange,
dateRange,
category,
settings,
queueManager,
}: ScheduleAsyncCrawlOptions): Promise<string> => {
const payload = ListingTaskPayloadSchema.parse({
source_id: sourceId,
env,
page_range: pageRange ?? undefined,
date_range: dateRange ?? undefined,
category: category ?? undefined,
});
const manager = queueManager ?? createQueueManager({ settings });
try {
const job = await manager.enqueueListing(payload);
return job.id;
} finally {
if (!queueManager) {
await manager.close();
}
}
};
export const collectListing = async (payload: unknown): Promise<number> => {
const data = ListingTaskPayloadSchema.parse(payload);
const result = await handlers.collectListing(data);
return typeof result === "number" ? result : 0;
};
export const collectArticle = async (payload: unknown): Promise<unknown> => {
const data = ArticleTaskPayloadSchema.parse(payload);
return handlers.collectArticle(data);
};
export const forwardForProcessing = async (payload: unknown): Promise<unknown> => {
const data = ProcessedTaskPayloadSchema.parse(payload);
return handlers.forwardForProcessing(data);
};
@@ -0,0 +1,83 @@
import IORedis from "ioredis";
import { Worker, QueueEvents } from "bullmq";
import {
createQueueManager,
QueueFactory,
QueueManager,
QueueSettings,
QueueSettingsInput,
} from "./queue";
import { collectArticle, collectListing, forwardForProcessing } from "./tasks";
export interface WorkerOptions {
queueNames?: string[];
settings?: QueueSettings | QueueSettingsInput;
connection?: IORedis;
queueFactory?: QueueFactory;
concurrency?: number;
onError?: (error: Error) => void;
queueManager?: QueueManager;
}
export interface WorkerHandle {
readonly workers: Worker[];
readonly events: QueueEvents[];
close: () => Promise<void>;
}
export const startWorker = (options: WorkerOptions = {}): WorkerHandle => {
const manager =
options.queueManager ??
createQueueManager({
settings: options.settings,
connection: options.connection,
queueFactory: options.queueFactory,
});
const queueNames = options.queueNames ?? manager.iterQueueNames();
const workers: Worker[] = [];
const events: QueueEvents[] = [];
const connection = manager.connection;
for (const queueName of queueNames) {
const worker = new Worker(queueName, async (job) => {
switch (job.name) {
case "collect_listing":
return collectListing(job.data);
case "collect_article":
return collectArticle(job.data);
case "forward_for_processing":
return forwardForProcessing(job.data);
default:
throw new Error(`Unknown job name: ${job.name}`);
}
}, {
connection,
concurrency: options.concurrency ?? 5,
});
if (options.onError) {
worker.on("failed", (_, err) => options.onError?.(err as Error));
worker.on("error", (err) => options.onError?.(err as Error));
}
const queueEvents = new QueueEvents(queueName, { connection });
workers.push(worker);
events.push(queueEvents);
}
return {
workers,
events,
close: async () => {
await Promise.all(workers.map((worker) => worker.close()));
await Promise.all(events.map((event) => event.close()));
if (!options.queueManager) {
await manager.close();
}
},
};
};
@@ -0,0 +1,3 @@
export * from "./async/queue";
export * from "./async/tasks";
export * from "./async/worker";
+38
View File
@@ -0,0 +1,38 @@
import fs from "node:fs";
import type { RedisOptions } from "ioredis";
import { get_encoding } from "tiktoken";
import type { ProjectPaths } from "./schema";
export const ensureDirectories = (paths: ProjectPaths): void => {
for (const dir of [paths.data, paths.logs, paths.configs]) {
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
}
};
export const parseRedisUrl = (url: string): RedisOptions => {
if (!url.startsWith("redis://")) {
return {};
}
const parsed = new URL(url);
return {
host: parsed.hostname,
port: Number(parsed.port || 6379),
password: parsed.password || undefined,
db: Number(parsed.pathname?.replace("/", "") || 0),
};
};
export const countTokens = (text: string, encoding = "cl100k_base"): number => {
try {
const encoder = get_encoding(encoding);
const tokens = encoder.encode(text);
encoder.free();
return tokens.length;
} catch {
return text.length;
}
};
+9
View File
@@ -0,0 +1,9 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist"
},
"include": ["src"],
"references": []
}
+9
View File
@@ -0,0 +1,9 @@
import { defineConfig } from "vitest/config";
export default defineConfig({
test: {
environment: "node",
globals: true,
include: ["src/**/*.test.ts"],
},
});
+17
View File
@@ -0,0 +1,17 @@
{
"name": "basango-monorepo",
"private": true,
"version": "0.1.0",
"workspaces": [
"apps/*"
],
"scripts": {
"build": "tsc -b",
"test": "vitest --run"
},
"devDependencies": {
"typescript": "^5.4.0",
"vitest": "^1.6.0",
"@types/node": "^20.11.30"
}
}
+15
View File
@@ -0,0 +1,15 @@
{
"compilerOptions": {
"target": "ES2020",
"module": "ESNext",
"moduleResolution": "Node",
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
"strict": true,
"skipLibCheck": true,
"types": ["node"],
"resolveJsonModule": true,
"declaration": true,
"outDir": "dist"
}
}
+7
View File
@@ -0,0 +1,7 @@
{
"extends": "./tsconfig.base.json",
"files": [],
"references": [
{ "path": "./apps/crawler" }
]
}