refactor: centralize crawler schemas and json config
This commit is contained in:
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"name": "@basango/crawler",
|
||||
"version": "0.1.0",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"scripts": {
|
||||
"build": "tsc -b",
|
||||
"test": "vitest --run"
|
||||
},
|
||||
"dependencies": {
|
||||
"bullmq": "^4.17.0",
|
||||
"date-fns": "^3.6.0",
|
||||
"ioredis": "^5.3.2",
|
||||
"tiktoken": "^1.0.14",
|
||||
"zod": "^4.0.0"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { loadConfig } from "./config";
|
||||
import { resolveConfigPath } from "./schema";
|
||||
|
||||
describe("loadConfig", () => {
|
||||
it("parses json configuration and ensures directories", () => {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "crawler-config-"));
|
||||
const paths = {
|
||||
root: tempDir,
|
||||
data: path.join(tempDir, "data"),
|
||||
logs: path.join(tempDir, "logs"),
|
||||
configs: path.join(tempDir, "configs"),
|
||||
};
|
||||
|
||||
const configPath = path.join(tempDir, "pipeline.json");
|
||||
fs.writeFileSync(
|
||||
configPath,
|
||||
JSON.stringify(
|
||||
{
|
||||
paths,
|
||||
fetch: {
|
||||
client: { timeout: 10 },
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
|
||||
const config = loadConfig({ configPath });
|
||||
|
||||
expect(config.fetch.client.timeout).toBe(10);
|
||||
expect(fs.existsSync(paths.data)).toBe(true);
|
||||
expect(fs.existsSync(paths.logs)).toBe(true);
|
||||
expect(fs.existsSync(paths.configs)).toBe(true);
|
||||
});
|
||||
|
||||
it("merges environment override if available", () => {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "crawler-config-"));
|
||||
const paths = {
|
||||
root: tempDir,
|
||||
data: path.join(tempDir, "data"),
|
||||
logs: path.join(tempDir, "logs"),
|
||||
configs: path.join(tempDir, "configs"),
|
||||
};
|
||||
|
||||
const basePath = path.join(tempDir, "pipeline.json");
|
||||
fs.writeFileSync(
|
||||
basePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
paths,
|
||||
logging: { level: "INFO" },
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
|
||||
const overridePath = resolveConfigPath(basePath, "production");
|
||||
fs.writeFileSync(
|
||||
overridePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
logging: { level: "DEBUG" },
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
|
||||
const config = loadConfig({ configPath: basePath, env: "production" });
|
||||
|
||||
expect(config.logging.level).toBe("DEBUG");
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,88 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
|
||||
import {
|
||||
PipelineConfig,
|
||||
PipelineConfigSchema,
|
||||
mergePipelineConfig,
|
||||
resolveConfigPath,
|
||||
resolveProjectPaths,
|
||||
} from "./schema";
|
||||
import { ensureDirectories } from "./utils";
|
||||
|
||||
export interface LoadConfigOptions {
|
||||
configPath?: string;
|
||||
env?: string;
|
||||
}
|
||||
|
||||
const DEFAULT_CONFIG_FILES = [
|
||||
path.join(process.cwd(), "config", "pipeline.json"),
|
||||
path.join(process.cwd(), "pipeline.json"),
|
||||
];
|
||||
|
||||
const readJsonFile = (filePath: string): unknown => {
|
||||
const contents = fs.readFileSync(filePath, "utf-8");
|
||||
return contents.trim() === "" ? {} : JSON.parse(contents);
|
||||
};
|
||||
|
||||
const locateConfigFile = (explicit?: string): string => {
|
||||
if (explicit && fs.existsSync(explicit)) {
|
||||
return explicit;
|
||||
}
|
||||
|
||||
for (const candidate of DEFAULT_CONFIG_FILES) {
|
||||
if (fs.existsSync(candidate)) {
|
||||
return candidate;
|
||||
}
|
||||
}
|
||||
|
||||
return DEFAULT_CONFIG_FILES[0];
|
||||
};
|
||||
|
||||
const readPipelineConfig = (configPath: string): PipelineConfig => {
|
||||
if (!fs.existsSync(configPath)) {
|
||||
return PipelineConfigSchema.parse({
|
||||
paths: resolveProjectPaths(path.resolve(".")),
|
||||
});
|
||||
}
|
||||
|
||||
const raw = readJsonFile(configPath);
|
||||
return PipelineConfigSchema.parse(raw);
|
||||
};
|
||||
|
||||
const applyEnvironmentOverride = (
|
||||
baseConfig: PipelineConfig,
|
||||
basePath: string,
|
||||
env?: string,
|
||||
): PipelineConfig => {
|
||||
if (!env || env === "development") {
|
||||
return baseConfig;
|
||||
}
|
||||
|
||||
const overridePath = resolveConfigPath(basePath, env);
|
||||
if (!fs.existsSync(overridePath)) {
|
||||
return baseConfig;
|
||||
}
|
||||
|
||||
const overrides = PipelineConfigSchema.parse(readJsonFile(overridePath));
|
||||
return mergePipelineConfig(baseConfig, overrides);
|
||||
};
|
||||
|
||||
export const loadConfig = (options: LoadConfigOptions = {}): PipelineConfig => {
|
||||
const basePath = locateConfigFile(options.configPath);
|
||||
const config = applyEnvironmentOverride(
|
||||
readPipelineConfig(basePath),
|
||||
basePath,
|
||||
options.env,
|
||||
);
|
||||
|
||||
ensureDirectories(config.paths);
|
||||
return config;
|
||||
};
|
||||
|
||||
export const dumpConfig = (config: PipelineConfig, targetPath?: string): void => {
|
||||
const destination = targetPath ?? locateConfigFile();
|
||||
const normalized = PipelineConfigSchema.parse(config);
|
||||
fs.mkdirSync(path.dirname(destination), { recursive: true });
|
||||
fs.writeFileSync(destination, JSON.stringify(normalized, null, 2));
|
||||
};
|
||||
@@ -0,0 +1,4 @@
|
||||
export * from "./config";
|
||||
export * from "./schema";
|
||||
export * from "./utils";
|
||||
export * from "./services/crawler";
|
||||
@@ -0,0 +1,35 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import {
|
||||
PipelineConfigSchema,
|
||||
createDateRange,
|
||||
formatDateRange,
|
||||
isTimestampInRange,
|
||||
PageRangeSpecSchema,
|
||||
PageRangeSchema,
|
||||
schemaToJSON,
|
||||
} from "./schema";
|
||||
|
||||
describe("schema helpers", () => {
|
||||
it("creates date range from spec", () => {
|
||||
const range = createDateRange("2024-01-01:2024-01-31");
|
||||
expect(range.start).toBeLessThan(range.end);
|
||||
expect(formatDateRange(range)).toBe("2024-01-01:2024-01-31");
|
||||
});
|
||||
|
||||
it("checks membership", () => {
|
||||
const range = createDateRange("2024-01-01:2024-01-02");
|
||||
expect(isTimestampInRange(range, range.start)).toBe(true);
|
||||
expect(isTimestampInRange(range, range.start - 1)).toBe(false);
|
||||
});
|
||||
|
||||
it("parses page range spec", () => {
|
||||
const range = PageRangeSchema.parse(PageRangeSpecSchema.parse("1:10"));
|
||||
expect(range).toEqual({ start: 1, end: 10 });
|
||||
});
|
||||
|
||||
it("produces json schema", () => {
|
||||
const json = schemaToJSON(PipelineConfigSchema);
|
||||
expect(json.type).toBe("object");
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,297 @@
|
||||
import path from "node:path";
|
||||
|
||||
import { getUnixTime, parse, isMatch, format as formatDate } from "date-fns";
|
||||
import { z } from "zod";
|
||||
|
||||
export const UpdateDirectionSchema = z.enum(["forward", "backward"]);
|
||||
export type UpdateDirection = z.infer<typeof UpdateDirectionSchema>;
|
||||
|
||||
export const SourceKindSchema = z.enum(["wordpress", "html"]);
|
||||
export type SourceKind = z.infer<typeof SourceKindSchema>;
|
||||
|
||||
export const SourceDateSchema = z.object({
|
||||
format: z.string().default("yyyy-LL-dd HH:mm"),
|
||||
pattern: z.string().nullable().optional(),
|
||||
replacement: z.string().nullable().optional(),
|
||||
});
|
||||
export type SourceDate = z.infer<typeof SourceDateSchema>;
|
||||
|
||||
export const SourceSelectorsSchema = z.object({
|
||||
articles: z.string().optional().nullable(),
|
||||
article_title: z.string().optional().nullable(),
|
||||
article_link: z.string().optional().nullable(),
|
||||
article_body: z.string().optional().nullable(),
|
||||
article_date: z.string().optional().nullable(),
|
||||
article_categories: z.string().optional().nullable(),
|
||||
pagination: z.string().default("ul.pagination > li a"),
|
||||
});
|
||||
export type SourceSelectors = z.infer<typeof SourceSelectorsSchema>;
|
||||
|
||||
const BaseSourceSchema = z.object({
|
||||
source_id: z.string(),
|
||||
source_url: z.string().url(),
|
||||
source_date: SourceDateSchema.default(SourceDateSchema.parse({})),
|
||||
source_kind: SourceKindSchema,
|
||||
categories: z.array(z.string()).default([]),
|
||||
supports_categories: z.boolean().default(false),
|
||||
requires_details: z.boolean().default(false),
|
||||
requires_rate_limit: z.boolean().default(false),
|
||||
});
|
||||
|
||||
export const HtmlSourceConfigSchema = BaseSourceSchema.extend({
|
||||
source_kind: z.literal("html"),
|
||||
source_selectors: SourceSelectorsSchema.default(SourceSelectorsSchema.parse({})),
|
||||
pagination_template: z.string(),
|
||||
});
|
||||
|
||||
export const WordPressSourceConfigSchema = BaseSourceSchema.extend({
|
||||
source_kind: z.literal("wordpress"),
|
||||
source_date: SourceDateSchema.default(
|
||||
SourceDateSchema.parse({ format: "yyyy-LL-dd'T'HH:mm:ss" }),
|
||||
),
|
||||
});
|
||||
|
||||
export type HtmlSourceConfig = z.infer<typeof HtmlSourceConfigSchema>;
|
||||
export type WordPressSourceConfig = z.infer<typeof WordPressSourceConfigSchema>;
|
||||
export type AnySourceConfig = HtmlSourceConfig | WordPressSourceConfig;
|
||||
|
||||
export const DateRangeSchema = z
|
||||
.object({
|
||||
start: z.number().int(),
|
||||
end: z.number().int(),
|
||||
})
|
||||
.superRefine((value, ctx) => {
|
||||
if (value.start === 0 || value.end === 0) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: "Timestamp cannot be zero",
|
||||
});
|
||||
}
|
||||
if (value.end < value.start) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: "End timestamp must be greater than or equal to start",
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
export type DateRange = z.infer<typeof DateRangeSchema>;
|
||||
|
||||
export const PageRangeSchema = z
|
||||
.object({
|
||||
start: z.number().int().min(0),
|
||||
end: z.number().int().min(0),
|
||||
})
|
||||
.superRefine((value, ctx) => {
|
||||
if (value.end < value.start) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: "End page must be greater than or equal to start page",
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
export type PageRange = z.infer<typeof PageRangeSchema>;
|
||||
|
||||
export const PageRangeSpecSchema = z
|
||||
.string()
|
||||
.regex(/^[0-9]+:[0-9]+$/, "Invalid page range format. Use start:end")
|
||||
.transform((spec) => {
|
||||
const [startText, endText] = spec.split(":");
|
||||
return {
|
||||
start: Number.parseInt(startText, 10),
|
||||
end: Number.parseInt(endText, 10),
|
||||
};
|
||||
});
|
||||
|
||||
const defaultDateFormat = "yyyy-LL-dd";
|
||||
|
||||
export const DateRangeSpecSchema = z
|
||||
.string()
|
||||
.regex(/.+:.+/, "Expected start:end format")
|
||||
.transform((spec) => {
|
||||
const [startRaw, endRaw] = spec.split(":");
|
||||
return { startRaw, endRaw };
|
||||
});
|
||||
|
||||
const parseDate = (value: string, format: string): Date => {
|
||||
if (!isMatch(value, format)) {
|
||||
throw new Error(`Invalid date '${value}' for format '${format}'`);
|
||||
}
|
||||
const parsed = parse(value, format, new Date());
|
||||
if (Number.isNaN(parsed.getTime())) {
|
||||
throw new Error(`Invalid date '${value}' for format '${format}'`);
|
||||
}
|
||||
return parsed;
|
||||
};
|
||||
|
||||
export interface CreateDateRangeOptions {
|
||||
format?: string;
|
||||
separator?: string;
|
||||
}
|
||||
|
||||
export const createDateRange = (
|
||||
spec: string,
|
||||
options: CreateDateRangeOptions = {},
|
||||
): DateRange => {
|
||||
const { format = defaultDateFormat, separator = ":" } = options;
|
||||
if (!separator) {
|
||||
throw new Error("Separator cannot be empty");
|
||||
}
|
||||
|
||||
const normalized = spec.replace(separator, ":");
|
||||
const parsedSpec = DateRangeSpecSchema.parse(normalized);
|
||||
|
||||
const startDate = parseDate(parsedSpec.startRaw, format);
|
||||
const endDate = parseDate(parsedSpec.endRaw, format);
|
||||
|
||||
const range = {
|
||||
start: getUnixTime(startDate),
|
||||
end: getUnixTime(endDate),
|
||||
};
|
||||
|
||||
return DateRangeSchema.parse(range);
|
||||
};
|
||||
|
||||
export const formatDateRange = (range: DateRange, fmt = defaultDateFormat): string => {
|
||||
const start = formatDate(new Date(range.start * 1000), fmt);
|
||||
const end = formatDate(new Date(range.end * 1000), fmt);
|
||||
return `${start}:${end}`;
|
||||
};
|
||||
|
||||
export const isTimestampInRange = (range: DateRange, timestamp: number): boolean => {
|
||||
return range.start <= timestamp && timestamp <= range.end;
|
||||
};
|
||||
|
||||
export const ProjectPathsSchema = z.object({
|
||||
root: z.string(),
|
||||
data: z.string(),
|
||||
logs: z.string(),
|
||||
configs: z.string(),
|
||||
});
|
||||
export type ProjectPaths = z.infer<typeof ProjectPathsSchema>;
|
||||
|
||||
export const resolveProjectPaths = (rootDir: string): ProjectPaths => {
|
||||
return ProjectPathsSchema.parse({
|
||||
root: rootDir,
|
||||
data: path.join(rootDir, "data", "dataset"),
|
||||
logs: path.join(rootDir, "data", "logs"),
|
||||
configs: path.join(rootDir, "config"),
|
||||
});
|
||||
};
|
||||
|
||||
export const LoggingConfigSchema = z.object({
|
||||
level: z.string().default("INFO"),
|
||||
format: z
|
||||
.string()
|
||||
.default("%(asctime)s - %(name)s - %(levelname)s - %(message)s"),
|
||||
console_logging: z.boolean().default(true),
|
||||
file_logging: z.boolean().default(false),
|
||||
log_file: z.string().default("crawler.log"),
|
||||
max_log_size: z.number().int().positive().default(10 * 1024 * 1024),
|
||||
backup_count: z.number().int().nonnegative().default(5),
|
||||
});
|
||||
export type LoggingConfig = z.infer<typeof LoggingConfigSchema>;
|
||||
|
||||
export const ClientConfigSchema = z.object({
|
||||
timeout: z.number().positive().default(20),
|
||||
user_agent: z
|
||||
.string()
|
||||
.default("Basango/0.1 (+https://github.com/bernard-ng/basango)"),
|
||||
follow_redirects: z.boolean().default(true),
|
||||
verify_ssl: z.boolean().default(true),
|
||||
rotate: z.boolean().default(true),
|
||||
max_retries: z.number().int().nonnegative().default(3),
|
||||
backoff_initial: z.number().nonnegative().default(1),
|
||||
backoff_multiplier: z.number().positive().default(2),
|
||||
backoff_max: z.number().nonnegative().default(30),
|
||||
respect_retry_after: z.boolean().default(true),
|
||||
});
|
||||
|
||||
export const CrawlerConfigSchema = z.object({
|
||||
source: z.union([HtmlSourceConfigSchema, WordPressSourceConfigSchema]).optional(),
|
||||
page_range: PageRangeSchema.optional(),
|
||||
date_range: DateRangeSchema.optional(),
|
||||
category: z.string().optional(),
|
||||
notify: z.boolean().default(false),
|
||||
is_update: z.boolean().default(false),
|
||||
use_multi_threading: z.boolean().default(false),
|
||||
max_workers: z.number().int().positive().default(5),
|
||||
direction: UpdateDirectionSchema.default("forward"),
|
||||
});
|
||||
|
||||
export type ClientConfig = z.infer<typeof ClientConfigSchema>;
|
||||
export type CrawlerConfig = z.infer<typeof CrawlerConfigSchema> & {
|
||||
source?: AnySourceConfig;
|
||||
};
|
||||
|
||||
export const FetchConfigSchema = z.object({
|
||||
client: ClientConfigSchema.default(ClientConfigSchema.parse({})),
|
||||
crawler: CrawlerConfigSchema.default(CrawlerConfigSchema.parse({})),
|
||||
});
|
||||
export type FetchConfig = z.infer<typeof FetchConfigSchema>;
|
||||
|
||||
const SourcesConfigSchema = z.object({
|
||||
html: z.array(HtmlSourceConfigSchema).default([]),
|
||||
wordpress: z.array(WordPressSourceConfigSchema).default([]),
|
||||
});
|
||||
|
||||
export type SourcesConfig = z.infer<typeof SourcesConfigSchema> & {
|
||||
find: (sourceId: string) => AnySourceConfig | undefined;
|
||||
};
|
||||
|
||||
export const createSourcesConfig = (input: unknown): SourcesConfig => {
|
||||
const parsed = SourcesConfigSchema.parse(input);
|
||||
const resolver = (sourceId: string) =>
|
||||
[...parsed.html, ...parsed.wordpress].find((source) => source.source_id === sourceId);
|
||||
return Object.assign({ find: resolver }, parsed);
|
||||
};
|
||||
|
||||
export const PipelineConfigSchema = z.object({
|
||||
paths: ProjectPathsSchema.default(resolveProjectPaths(process.cwd())),
|
||||
logging: LoggingConfigSchema.default(LoggingConfigSchema.parse({})),
|
||||
fetch: FetchConfigSchema.default(FetchConfigSchema.parse({})),
|
||||
sources: z
|
||||
.union([SourcesConfigSchema, z.undefined()])
|
||||
.transform((value) => createSourcesConfig(value ?? {})),
|
||||
});
|
||||
|
||||
export type PipelineConfig = z.infer<typeof PipelineConfigSchema> & {
|
||||
sources: SourcesConfig;
|
||||
};
|
||||
|
||||
export const mergePipelineConfig = (
|
||||
base: PipelineConfig,
|
||||
overrides: Partial<PipelineConfig>,
|
||||
): PipelineConfig => {
|
||||
const paths = overrides.paths ?? base.paths;
|
||||
const logging = { ...base.logging, ...(overrides.logging ?? {}) };
|
||||
const fetch = {
|
||||
client: { ...base.fetch.client, ...(overrides.fetch?.client ?? {}) },
|
||||
crawler: { ...base.fetch.crawler, ...(overrides.fetch?.crawler ?? {}) },
|
||||
};
|
||||
|
||||
const sources = createSourcesConfig({
|
||||
html: overrides.sources?.html ?? base.sources.html,
|
||||
wordpress: overrides.sources?.wordpress ?? base.sources.wordpress,
|
||||
});
|
||||
|
||||
return {
|
||||
paths,
|
||||
logging,
|
||||
fetch,
|
||||
sources,
|
||||
};
|
||||
};
|
||||
|
||||
export const resolveConfigPath = (basePath: string, env?: string): string => {
|
||||
if (!env || env === "development") {
|
||||
return basePath;
|
||||
}
|
||||
|
||||
const ext = path.extname(basePath);
|
||||
const withoutExt = basePath.slice(0, basePath.length - ext.length);
|
||||
return `${withoutExt}.${env}${ext}`;
|
||||
};
|
||||
|
||||
export const schemaToJSON = <T extends z.ZodTypeAny>(schema: T) => schema.toJSON();
|
||||
@@ -0,0 +1,58 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { createQueueManager, createQueueSettings } from "./queue";
|
||||
|
||||
class InMemoryQueue {
|
||||
public jobs: Array<{ name: string; data: unknown }> = [];
|
||||
|
||||
async add(name: string, data: unknown) {
|
||||
this.jobs.push({ name, data });
|
||||
return { id: `${name}-${this.jobs.length}` };
|
||||
}
|
||||
}
|
||||
|
||||
describe("createQueueManager", () => {
|
||||
it("prefixes queue names", () => {
|
||||
const manager = createQueueManager({
|
||||
settings: createQueueSettings({ prefix: "test" }),
|
||||
queueFactory: (queueName) => {
|
||||
expect(queueName).toBe("listing");
|
||||
return new InMemoryQueue();
|
||||
},
|
||||
connection: {
|
||||
quit: async () => undefined,
|
||||
} as any,
|
||||
});
|
||||
|
||||
expect(manager.iterQueueNames()).toEqual([
|
||||
"test:listing",
|
||||
"test:articles",
|
||||
"test:processed",
|
||||
]);
|
||||
});
|
||||
|
||||
it("enqueues listing job with validated payload", async () => {
|
||||
const queue = new InMemoryQueue();
|
||||
const manager = createQueueManager({
|
||||
queueFactory: () => queue,
|
||||
connection: { quit: async () => undefined } as any,
|
||||
});
|
||||
|
||||
const job = await manager.enqueueListing({
|
||||
source_id: "radiookapi",
|
||||
env: "test",
|
||||
});
|
||||
|
||||
expect(job.id).toBe("collect_listing-1");
|
||||
expect(queue.jobs[0]).toEqual({
|
||||
name: "collect_listing",
|
||||
data: {
|
||||
source_id: "radiookapi",
|
||||
env: "test",
|
||||
page_range: undefined,
|
||||
date_range: undefined,
|
||||
category: undefined,
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,118 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
|
||||
import IORedis from "ioredis";
|
||||
import { Queue, JobsOptions, QueueOptions } from "bullmq";
|
||||
import { z } from "zod";
|
||||
|
||||
import {
|
||||
ListingTaskPayload,
|
||||
ArticleTaskPayload,
|
||||
ProcessedTaskPayload,
|
||||
ListingTaskPayloadSchema,
|
||||
ArticleTaskPayloadSchema,
|
||||
ProcessedTaskPayloadSchema,
|
||||
} from "./schemas";
|
||||
import { parseRedisUrl } from "../../../utils";
|
||||
|
||||
const QueueSettingsSchema = z.object({
|
||||
redis_url: z.string().default(process.env.BASANGO_REDIS_URL ?? "redis://localhost:6379/0"),
|
||||
prefix: z.string().default(process.env.BASANGO_QUEUE_PREFIX ?? "crawler"),
|
||||
default_timeout: z.number().int().positive().default(Number(process.env.BASANGO_QUEUE_TIMEOUT ?? 600)),
|
||||
result_ttl: z.number().int().nonnegative().default(Number(process.env.BASANGO_QUEUE_RESULT_TTL ?? 3600)),
|
||||
failure_ttl: z.number().int().nonnegative().default(Number(process.env.BASANGO_QUEUE_FAILURE_TTL ?? 3600)),
|
||||
listing_queue: z.string().default("listing"),
|
||||
article_queue: z.string().default("articles"),
|
||||
processed_queue: z.string().default("processed"),
|
||||
});
|
||||
|
||||
export type QueueSettingsInput = z.input<typeof QueueSettingsSchema>;
|
||||
export type QueueSettings = z.output<typeof QueueSettingsSchema>;
|
||||
|
||||
export const createQueueSettings = (input?: QueueSettingsInput): QueueSettings =>
|
||||
QueueSettingsSchema.parse(input ?? {});
|
||||
|
||||
export interface QueueBackend<T = unknown> {
|
||||
add: (name: string, data: T, opts?: JobsOptions) => Promise<{ id: string }>;
|
||||
}
|
||||
|
||||
export type QueueFactory = (
|
||||
queueName: string,
|
||||
settings: QueueSettings,
|
||||
connection?: IORedis,
|
||||
) => QueueBackend;
|
||||
|
||||
const defaultQueueFactory: QueueFactory = (queueName, settings, connection) => {
|
||||
const redisConnection = connection ?? new IORedis(settings.redis_url, parseRedisUrl(settings.redis_url));
|
||||
const options: QueueOptions = {
|
||||
connection: redisConnection,
|
||||
prefix: settings.prefix,
|
||||
};
|
||||
|
||||
const queue = new Queue(queueName, options);
|
||||
return {
|
||||
add: async (name, data, opts) => {
|
||||
const job = await queue.add(name, data, {
|
||||
removeOnComplete: settings.result_ttl === 0 ? true : undefined,
|
||||
removeOnFail: settings.failure_ttl === 0 ? true : undefined,
|
||||
timeout: settings.default_timeout * 1000,
|
||||
...opts,
|
||||
});
|
||||
return { id: job.id ?? randomUUID() };
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
export interface CreateQueueManagerOptions {
|
||||
settings?: QueueSettings | QueueSettingsInput;
|
||||
queueFactory?: QueueFactory;
|
||||
connection?: IORedis;
|
||||
}
|
||||
|
||||
export interface QueueManager {
|
||||
readonly settings: QueueSettings;
|
||||
readonly connection: IORedis;
|
||||
enqueueListing: (payload: ListingTaskPayload) => Promise<{ id: string }>;
|
||||
enqueueArticle: (payload: ArticleTaskPayload) => Promise<{ id: string }>;
|
||||
enqueueProcessed: (payload: ProcessedTaskPayload) => Promise<{ id: string }>;
|
||||
iterQueueNames: () => string[];
|
||||
queueName: (suffix: string) => string;
|
||||
close: () => Promise<void>;
|
||||
}
|
||||
|
||||
export const createQueueManager = (options: CreateQueueManagerOptions = {}): QueueManager => {
|
||||
const settings = createQueueSettings(options.settings as QueueSettingsInput | undefined);
|
||||
|
||||
const connection = options.connection ?? new IORedis(settings.redis_url, parseRedisUrl(settings.redis_url));
|
||||
const factory = options.queueFactory ?? defaultQueueFactory;
|
||||
|
||||
const ensureQueue = (queueName: string) => factory(queueName, settings, connection);
|
||||
|
||||
return {
|
||||
settings,
|
||||
connection,
|
||||
enqueueListing: (payload) => {
|
||||
const data = ListingTaskPayloadSchema.parse(payload);
|
||||
const queue = ensureQueue(settings.listing_queue);
|
||||
return queue.add("collect_listing", data);
|
||||
},
|
||||
enqueueArticle: (payload) => {
|
||||
const data = ArticleTaskPayloadSchema.parse(payload);
|
||||
const queue = ensureQueue(settings.article_queue);
|
||||
return queue.add("collect_article", data);
|
||||
},
|
||||
enqueueProcessed: (payload) => {
|
||||
const data = ProcessedTaskPayloadSchema.parse(payload);
|
||||
const queue = ensureQueue(settings.processed_queue);
|
||||
return queue.add("forward_for_processing", data);
|
||||
},
|
||||
iterQueueNames: () => [
|
||||
`${settings.prefix}:${settings.listing_queue}`,
|
||||
`${settings.prefix}:${settings.article_queue}`,
|
||||
`${settings.prefix}:${settings.processed_queue}`,
|
||||
],
|
||||
queueName: (suffix: string) => `${settings.prefix}:${suffix}`,
|
||||
close: async () => {
|
||||
await connection.quit();
|
||||
},
|
||||
};
|
||||
};
|
||||
@@ -0,0 +1,37 @@
|
||||
import { z } from "zod";
|
||||
|
||||
import { AnySourceConfig, DateRangeSchema, PageRangeSchema } from "../../../schema";
|
||||
|
||||
export const ListingTaskPayloadSchema = z.object({
|
||||
source_id: z.string(),
|
||||
env: z.string().default("development"),
|
||||
page_range: z.string().optional().nullable(),
|
||||
date_range: z.string().optional().nullable(),
|
||||
category: z.string().optional().nullable(),
|
||||
});
|
||||
|
||||
export type ListingTaskPayload = z.infer<typeof ListingTaskPayloadSchema>;
|
||||
|
||||
export const ArticleTaskPayloadSchema = z.object({
|
||||
source_id: z.string(),
|
||||
env: z.string().default("development"),
|
||||
url: z.string().url(),
|
||||
page: z.number().int().nonnegative().optional(),
|
||||
page_range: PageRangeSchema.optional().nullable(),
|
||||
date_range: DateRangeSchema.optional().nullable(),
|
||||
category: z.string().optional().nullable(),
|
||||
});
|
||||
|
||||
export type ArticleTaskPayload = z.infer<typeof ArticleTaskPayloadSchema>;
|
||||
|
||||
export const ProcessedTaskPayloadSchema = z.object({
|
||||
source_id: z.string(),
|
||||
env: z.string().default("development"),
|
||||
article: z.any(),
|
||||
});
|
||||
|
||||
export type ProcessedTaskPayload = z.infer<typeof ProcessedTaskPayloadSchema>;
|
||||
|
||||
export interface ListingContext {
|
||||
source: AnySourceConfig;
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import { scheduleAsyncCrawl, registerCrawlerTaskHandlers, collectListing } from "./tasks";
|
||||
import { QueueManager } from "./queue";
|
||||
|
||||
describe("Async tasks", () => {
|
||||
it("schedules crawl with provided manager", async () => {
|
||||
const enqueueListing = vi.fn().mockResolvedValue({ id: "job-1" });
|
||||
const manager = {
|
||||
enqueueListing,
|
||||
} as unknown as QueueManager;
|
||||
|
||||
const jobId = await scheduleAsyncCrawl({
|
||||
sourceId: "radiookapi",
|
||||
queueManager: manager,
|
||||
});
|
||||
|
||||
expect(jobId).toBe("job-1");
|
||||
expect(enqueueListing).toHaveBeenCalledWith({
|
||||
source_id: "radiookapi",
|
||||
env: "development",
|
||||
page_range: undefined,
|
||||
date_range: undefined,
|
||||
category: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it("delegates listing collection to registered handler", async () => {
|
||||
const handler = vi.fn().mockResolvedValue(5);
|
||||
registerCrawlerTaskHandlers({ collectListing: handler });
|
||||
|
||||
const count = await collectListing({
|
||||
source_id: "radiookapi",
|
||||
env: "development",
|
||||
});
|
||||
|
||||
expect(count).toBe(5);
|
||||
expect(handler).toHaveBeenCalledWith({
|
||||
source_id: "radiookapi",
|
||||
env: "development",
|
||||
page_range: undefined,
|
||||
date_range: undefined,
|
||||
category: undefined,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,88 @@
|
||||
import {
|
||||
ListingTaskPayloadSchema,
|
||||
ArticleTaskPayloadSchema,
|
||||
ProcessedTaskPayloadSchema,
|
||||
ListingTaskPayload,
|
||||
ArticleTaskPayload,
|
||||
ProcessedTaskPayload,
|
||||
} from "./schemas";
|
||||
import {
|
||||
createQueueManager,
|
||||
QueueManager,
|
||||
QueueSettings,
|
||||
QueueSettingsInput,
|
||||
} from "./queue";
|
||||
|
||||
export interface CrawlerTaskHandlers {
|
||||
collectListing: (payload: ListingTaskPayload) => Promise<number> | number;
|
||||
collectArticle: (payload: ArticleTaskPayload) => Promise<unknown> | unknown;
|
||||
forwardForProcessing: (payload: ProcessedTaskPayload) => Promise<unknown> | unknown;
|
||||
}
|
||||
|
||||
const notImplemented = (name: keyof CrawlerTaskHandlers) => () => {
|
||||
throw new Error(`Crawler task handler '${name}' is not implemented`);
|
||||
};
|
||||
|
||||
let handlers: CrawlerTaskHandlers = {
|
||||
collectListing: notImplemented("collectListing"),
|
||||
collectArticle: notImplemented("collectArticle"),
|
||||
forwardForProcessing: notImplemented("forwardForProcessing"),
|
||||
};
|
||||
|
||||
export const registerCrawlerTaskHandlers = (overrides: Partial<CrawlerTaskHandlers>): void => {
|
||||
handlers = { ...handlers, ...overrides };
|
||||
};
|
||||
|
||||
export interface ScheduleAsyncCrawlOptions {
|
||||
sourceId: string;
|
||||
env?: string;
|
||||
pageRange?: string | null;
|
||||
dateRange?: string | null;
|
||||
category?: string | null;
|
||||
settings?: QueueSettings | QueueSettingsInput;
|
||||
queueManager?: QueueManager;
|
||||
}
|
||||
|
||||
export const scheduleAsyncCrawl = async ({
|
||||
sourceId,
|
||||
env = "development",
|
||||
pageRange,
|
||||
dateRange,
|
||||
category,
|
||||
settings,
|
||||
queueManager,
|
||||
}: ScheduleAsyncCrawlOptions): Promise<string> => {
|
||||
const payload = ListingTaskPayloadSchema.parse({
|
||||
source_id: sourceId,
|
||||
env,
|
||||
page_range: pageRange ?? undefined,
|
||||
date_range: dateRange ?? undefined,
|
||||
category: category ?? undefined,
|
||||
});
|
||||
|
||||
const manager = queueManager ?? createQueueManager({ settings });
|
||||
try {
|
||||
const job = await manager.enqueueListing(payload);
|
||||
return job.id;
|
||||
} finally {
|
||||
if (!queueManager) {
|
||||
await manager.close();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export const collectListing = async (payload: unknown): Promise<number> => {
|
||||
const data = ListingTaskPayloadSchema.parse(payload);
|
||||
const result = await handlers.collectListing(data);
|
||||
return typeof result === "number" ? result : 0;
|
||||
};
|
||||
|
||||
export const collectArticle = async (payload: unknown): Promise<unknown> => {
|
||||
const data = ArticleTaskPayloadSchema.parse(payload);
|
||||
return handlers.collectArticle(data);
|
||||
};
|
||||
|
||||
export const forwardForProcessing = async (payload: unknown): Promise<unknown> => {
|
||||
const data = ProcessedTaskPayloadSchema.parse(payload);
|
||||
return handlers.forwardForProcessing(data);
|
||||
};
|
||||
@@ -0,0 +1,83 @@
|
||||
import IORedis from "ioredis";
|
||||
import { Worker, QueueEvents } from "bullmq";
|
||||
|
||||
import {
|
||||
createQueueManager,
|
||||
QueueFactory,
|
||||
QueueManager,
|
||||
QueueSettings,
|
||||
QueueSettingsInput,
|
||||
} from "./queue";
|
||||
import { collectArticle, collectListing, forwardForProcessing } from "./tasks";
|
||||
|
||||
export interface WorkerOptions {
|
||||
queueNames?: string[];
|
||||
settings?: QueueSettings | QueueSettingsInput;
|
||||
connection?: IORedis;
|
||||
queueFactory?: QueueFactory;
|
||||
concurrency?: number;
|
||||
onError?: (error: Error) => void;
|
||||
queueManager?: QueueManager;
|
||||
}
|
||||
|
||||
export interface WorkerHandle {
|
||||
readonly workers: Worker[];
|
||||
readonly events: QueueEvents[];
|
||||
close: () => Promise<void>;
|
||||
}
|
||||
|
||||
export const startWorker = (options: WorkerOptions = {}): WorkerHandle => {
|
||||
const manager =
|
||||
options.queueManager ??
|
||||
createQueueManager({
|
||||
settings: options.settings,
|
||||
connection: options.connection,
|
||||
queueFactory: options.queueFactory,
|
||||
});
|
||||
|
||||
const queueNames = options.queueNames ?? manager.iterQueueNames();
|
||||
const workers: Worker[] = [];
|
||||
const events: QueueEvents[] = [];
|
||||
|
||||
const connection = manager.connection;
|
||||
|
||||
for (const queueName of queueNames) {
|
||||
const worker = new Worker(queueName, async (job) => {
|
||||
switch (job.name) {
|
||||
case "collect_listing":
|
||||
return collectListing(job.data);
|
||||
case "collect_article":
|
||||
return collectArticle(job.data);
|
||||
case "forward_for_processing":
|
||||
return forwardForProcessing(job.data);
|
||||
default:
|
||||
throw new Error(`Unknown job name: ${job.name}`);
|
||||
}
|
||||
}, {
|
||||
connection,
|
||||
concurrency: options.concurrency ?? 5,
|
||||
});
|
||||
|
||||
if (options.onError) {
|
||||
worker.on("failed", (_, err) => options.onError?.(err as Error));
|
||||
worker.on("error", (err) => options.onError?.(err as Error));
|
||||
}
|
||||
|
||||
const queueEvents = new QueueEvents(queueName, { connection });
|
||||
|
||||
workers.push(worker);
|
||||
events.push(queueEvents);
|
||||
}
|
||||
|
||||
return {
|
||||
workers,
|
||||
events,
|
||||
close: async () => {
|
||||
await Promise.all(workers.map((worker) => worker.close()));
|
||||
await Promise.all(events.map((event) => event.close()));
|
||||
if (!options.queueManager) {
|
||||
await manager.close();
|
||||
}
|
||||
},
|
||||
};
|
||||
};
|
||||
@@ -0,0 +1,3 @@
|
||||
export * from "./async/queue";
|
||||
export * from "./async/tasks";
|
||||
export * from "./async/worker";
|
||||
@@ -0,0 +1,38 @@
|
||||
import fs from "node:fs";
|
||||
|
||||
import type { RedisOptions } from "ioredis";
|
||||
import { get_encoding } from "tiktoken";
|
||||
|
||||
import type { ProjectPaths } from "./schema";
|
||||
|
||||
export const ensureDirectories = (paths: ProjectPaths): void => {
|
||||
for (const dir of [paths.data, paths.logs, paths.configs]) {
|
||||
if (!fs.existsSync(dir)) {
|
||||
fs.mkdirSync(dir, { recursive: true });
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export const parseRedisUrl = (url: string): RedisOptions => {
|
||||
if (!url.startsWith("redis://")) {
|
||||
return {};
|
||||
}
|
||||
const parsed = new URL(url);
|
||||
return {
|
||||
host: parsed.hostname,
|
||||
port: Number(parsed.port || 6379),
|
||||
password: parsed.password || undefined,
|
||||
db: Number(parsed.pathname?.replace("/", "") || 0),
|
||||
};
|
||||
};
|
||||
|
||||
export const countTokens = (text: string, encoding = "cl100k_base"): number => {
|
||||
try {
|
||||
const encoder = get_encoding(encoding);
|
||||
const tokens = encoder.encode(text);
|
||||
encoder.free();
|
||||
return tokens.length;
|
||||
} catch {
|
||||
return text.length;
|
||||
}
|
||||
};
|
||||
@@ -0,0 +1,9 @@
|
||||
{
|
||||
"extends": "../../tsconfig.base.json",
|
||||
"compilerOptions": {
|
||||
"rootDir": "src",
|
||||
"outDir": "dist"
|
||||
},
|
||||
"include": ["src"],
|
||||
"references": []
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
import { defineConfig } from "vitest/config";
|
||||
|
||||
export default defineConfig({
|
||||
test: {
|
||||
environment: "node",
|
||||
globals: true,
|
||||
include: ["src/**/*.test.ts"],
|
||||
},
|
||||
});
|
||||
Reference in New Issue
Block a user