feat(crawler): add configuration manager and cli scripts
This commit is contained in:
@@ -0,0 +1,152 @@
|
||||
{
|
||||
"fetch": {
|
||||
"client": {
|
||||
"timeout": 20,
|
||||
"user_agent": "Basango/0.1 (+https://github.com/bernard-ng/basango)",
|
||||
"follow_redirects": true,
|
||||
"verify_ssl": true,
|
||||
"rotate": true,
|
||||
"max_retries": 3,
|
||||
"backoff_initial": 1,
|
||||
"backoff_multiplier": 2,
|
||||
"backoff_max": 30,
|
||||
"respect_retry_after": true
|
||||
},
|
||||
"crawler": {
|
||||
"notify": false,
|
||||
"use_multi_threading": false,
|
||||
"max_workers": 5
|
||||
}
|
||||
},
|
||||
"logging": {
|
||||
"level": "INFO",
|
||||
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
"file_logging": true,
|
||||
"console_logging": true,
|
||||
"log_file": "pipeline.log",
|
||||
"max_log_size": 10485760,
|
||||
"backup_count": 5
|
||||
},
|
||||
"sources": {
|
||||
"html": [
|
||||
{
|
||||
"source_id": "radiookapi.net",
|
||||
"source_url": "https://www.radiookapi.net",
|
||||
"source_date": {
|
||||
"pattern": "/(\\d{2})/(\\d{2})/(\\d{4}) - (\\d{2}:\\d{2})/",
|
||||
"replacement": "$3-$2-$1 $4"
|
||||
},
|
||||
"source_selectors": {
|
||||
"articles": ".view-content > .views-row.content-row",
|
||||
"article_title": "h1.page-header",
|
||||
"article_link": ".views-field-title a",
|
||||
"article_body": ".field-name-body",
|
||||
"article_date": ".views-field-created",
|
||||
"article_categories": ".views-field-field-cat-gorie a",
|
||||
"pagination": "ul.pagination > li.pager-last > a"
|
||||
},
|
||||
"pagination_template": "actualite",
|
||||
"supports_categories": false,
|
||||
"requires_details": true,
|
||||
"requires_rate_limit": false
|
||||
},
|
||||
{
|
||||
"source_id": "7sur7.cd",
|
||||
"source_url": "https://7sur7.cd",
|
||||
"source_date": {
|
||||
"pattern": "/\\w{3} (\\d{2})/(\\d{2})/(\\d{4}) - (\\d{2}:\\d{2})/",
|
||||
"replacement": "$3-$2-$1 $4"
|
||||
},
|
||||
"categories": [
|
||||
"politique",
|
||||
"economie",
|
||||
"culture",
|
||||
"sport",
|
||||
"societe"
|
||||
],
|
||||
"source_selectors": {
|
||||
"articles": ".view-content > .row.views-row",
|
||||
"article_title": ".views-field-title a",
|
||||
"article_link": ".views-field-title a",
|
||||
"article_body": ".field.field--name-body",
|
||||
"article_date": ".views-field-created",
|
||||
"pagination": "ul.pagination > li.pager__item.pager__item--last > a"
|
||||
},
|
||||
"pagination_template": "index.php/category/{category}",
|
||||
"supports_categories": true,
|
||||
"requires_details": false,
|
||||
"requires_rate_limit": false
|
||||
},
|
||||
{
|
||||
"source_id": "mediacongo.net",
|
||||
"source_url": "https://www.mediacongo.net",
|
||||
"source_date": {
|
||||
"format": "%d.%m.%Y %H:%M"
|
||||
},
|
||||
"source_selectors": {
|
||||
"articles": ".for_aitems > .article_other_item",
|
||||
"article_title": "img",
|
||||
"article_link": "a:first-child",
|
||||
"article_categories": "a.color_link",
|
||||
"article_body": ".article_ttext",
|
||||
"article_date": ".article_other_about",
|
||||
"pagination": "div.pagination > div > a:last-child"
|
||||
},
|
||||
"pagination_template": "articles.html",
|
||||
"supports_categories": false,
|
||||
"requires_details": true,
|
||||
"requires_rate_limit": false
|
||||
},
|
||||
{
|
||||
"source_id": "actualite.cd",
|
||||
"source_url": "https://actualite.cd",
|
||||
"source_date": {
|
||||
"pattern": "/(\\d{1}) (\\d{1,2}) (\\d{2}) (\\d{4}) - (\\d{2}:\\d{2})/",
|
||||
"replacement": "$4-$3-$2 $5"
|
||||
},
|
||||
"source_selectors": {
|
||||
"articles": "#views-bootstrap-taxonomy-term-page-2 > div > div",
|
||||
"article_title": "#actu-titre a",
|
||||
"article_link": "#actu-titre a",
|
||||
"article_categories": "#actu-cat a",
|
||||
"article_body": ".views-field.views-field-body",
|
||||
"article_date": "#p-date"
|
||||
},
|
||||
"pagination_template": "actualite",
|
||||
"supports_categories": false,
|
||||
"requires_details": true,
|
||||
"requires_rate_limit": false
|
||||
}
|
||||
],
|
||||
"wordpress": [
|
||||
{ "source_id": "beto.cd", "source_url": "https://beto.cd", "requires_rate_limit": true },
|
||||
{ "source_id": "newscd.net", "source_url": "https://newscd.net" },
|
||||
{ "source_id": "africanewsrdc.net", "source_url": "https://www.africanewsrdc.net" },
|
||||
{ "source_id": "angazainstitute.ac.cd", "source_url": "https://angazainstitute.ac.cd" },
|
||||
{ "source_id": "b-onetv.cd", "source_url": "https://b-onetv.cd" },
|
||||
{ "source_id": "bukavufm.com", "source_url": "https://bukavufm.com" },
|
||||
{ "source_id": "changement7.net", "source_url": "https://changement7.net" },
|
||||
{ "source_id": "congoactu.net", "source_url": "https://congoactu.net" },
|
||||
{ "source_id": "congoindependant.com", "source_url": "https://www.congoindependant.com" },
|
||||
{ "source_id": "congoquotidien.com", "source_url": "https://www.congoquotidien.com" },
|
||||
{ "source_id": "cumulard.cd", "source_url": "https://www.cumulard.cd" },
|
||||
{ "source_id": "environews-rdc.net", "source_url": "https://environews-rdc.net" },
|
||||
{ "source_id": "freemediardc.info", "source_url": "https://www.freemediardc.info" },
|
||||
{ "source_id": "geopolismagazine.org", "source_url": "https://geopolismagazine.org" },
|
||||
{ "source_id": "habarirdc.net", "source_url": "https://habarirdc.net" },
|
||||
{ "source_id": "infordc.com", "source_url": "https://infordc.com" },
|
||||
{ "source_id": "kilalopress.net", "source_url": "https://kilalopress.net" },
|
||||
{ "source_id": "laprosperiteonline.net", "source_url": "https://laprosperiteonline.net" },
|
||||
{ "source_id": "laprunellerdc.cd", "source_url": "https://laprunellerdc.cd" },
|
||||
{ "source_id": "lesmedias.net", "source_url": "https://lesmedias.net" },
|
||||
{ "source_id": "lesvolcansnews.net", "source_url": "https://lesvolcansnews.net" },
|
||||
{ "source_id": "netic-news.net", "source_url": "https://www.netic-news.net" },
|
||||
{ "source_id": "objectif-infos.cd", "source_url": "https://objectif-infos.cd" },
|
||||
{ "source_id": "scooprdc.net", "source_url": "https://scooprdc.net" },
|
||||
{ "source_id": "journaldekinshasa.com", "source_url": "https://www.journaldekinshasa.com" },
|
||||
{ "source_id": "lepotentiel.cd", "source_url": "https://lepotentiel.cd" },
|
||||
{ "source_id": "acturdc.com", "source_url": "https://acturdc.com" },
|
||||
{ "source_id": "matininfos.net", "source_url": "https://matininfos.net" }
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
{
|
||||
"fetch": {
|
||||
"client": {
|
||||
"timeout": 20,
|
||||
"user_agent": "Basango/0.1 (+https://github.com/bernard-ng/basango)",
|
||||
"follow_redirects": true,
|
||||
"verify_ssl": true,
|
||||
"rotate": true,
|
||||
"max_retries": 3,
|
||||
"backoff_initial": 1,
|
||||
"backoff_multiplier": 2,
|
||||
"backoff_max": 30,
|
||||
"respect_retry_after": true
|
||||
},
|
||||
"crawler": {
|
||||
"notify": false,
|
||||
"use_multi_threading": false,
|
||||
"max_workers": 5
|
||||
}
|
||||
},
|
||||
"logging": {
|
||||
"level": "ERROR",
|
||||
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
"file_logging": true,
|
||||
"console_logging": true,
|
||||
"log_file": "pipeline.log",
|
||||
"max_log_size": 10485760,
|
||||
"backup_count": 5
|
||||
},
|
||||
"sources": {
|
||||
"html": [
|
||||
{
|
||||
"source_id": "radiookapi.net",
|
||||
"source_url": "https://www.radiookapi.net",
|
||||
"source_date": {
|
||||
"pattern": "/(\\d{2})/(\\d{2})/(\\d{4}) - (\\d{2}:\\d{2})/",
|
||||
"replacement": "$3-$2-$1 $4"
|
||||
},
|
||||
"source_selectors": {
|
||||
"articles": ".view-content > .views-row.content-row",
|
||||
"article_title": ".views-field-title a",
|
||||
"article_link": ".views-field-title a",
|
||||
"article_body": ".field-name-body",
|
||||
"article_date": ".views-field-created",
|
||||
"article_categories": ".views-field-field-cat-gorie a",
|
||||
"pagination": "ul.pagination > li a(:last-child)"
|
||||
},
|
||||
"pagination_template": "/actualite?page={page}",
|
||||
"supports_categories": false,
|
||||
"requires_details": false,
|
||||
"requires_rate_limit": false
|
||||
},
|
||||
{
|
||||
"source_id": "7sur7.cd",
|
||||
"source_url": "https://7sur7.cd",
|
||||
"source_date": {
|
||||
"pattern": "/\\w{3} (\\d{2})/(\\d{2})/(\\d{4}) - (\\d{2}:\\d{2})/",
|
||||
"replacement": "$3-$2-$1 $4"
|
||||
},
|
||||
"categories": [
|
||||
"politique",
|
||||
"economie",
|
||||
"culture",
|
||||
"sport",
|
||||
"societe"
|
||||
],
|
||||
"source_selectors": {
|
||||
"articles": ".view-content > .row.views-row",
|
||||
"article_title": ".views-field-title a",
|
||||
"article_link": ".views-field-title a",
|
||||
"article_body": ".field.field--name-body",
|
||||
"article_date": ".views-field-created",
|
||||
"pagination": "ul.pagination > li a(:last-child)"
|
||||
},
|
||||
"pagination_template": "/index.php/category/{category}?page={page}",
|
||||
"supports_categories": true,
|
||||
"requires_details": false,
|
||||
"requires_rate_limit": false
|
||||
},
|
||||
{
|
||||
"source_id": "mediacongo.net",
|
||||
"source_url": "https://mediacongo.net",
|
||||
"source_date": {
|
||||
"format": "%d.%m.%Y %H:%M"
|
||||
},
|
||||
"source_selectors": {
|
||||
"articles": ".for_aitems > .article_other_item",
|
||||
"article_title": "img",
|
||||
"article_link": "a(:first-child)",
|
||||
"article_categories": "a.color_link",
|
||||
"article_body": ".article_ttext",
|
||||
"article_date": ".article_other_about",
|
||||
"pagination": ".nav > a(:last-child)"
|
||||
},
|
||||
"pagination_template": "/articles.html?page={page}",
|
||||
"supports_categories": false,
|
||||
"requires_details": true,
|
||||
"requires_rate_limit": false
|
||||
},
|
||||
{
|
||||
"source_id": "actualite.cd",
|
||||
"source_url": "https://actualite.cd",
|
||||
"source_date": {
|
||||
"pattern": "/(\\d{1}) (\\d{1,2}) (\\d{2}) (\\d{4}) - (\\d{2}:\\d{2})/",
|
||||
"replacement": "$4-$3-$2 $5"
|
||||
},
|
||||
"source_selectors": {
|
||||
"articles": "#views-bootstrap-taxonomy-term-page-2 > div > div",
|
||||
"article_title": "#actu-titre a",
|
||||
"article_link": "#actu-titre a",
|
||||
"article_categories": "#actu-cat a",
|
||||
"article_body": ".views-field.views-field-body",
|
||||
"article_date": "#p-date"
|
||||
},
|
||||
"pagination_template": "/actualite?page={page}",
|
||||
"supports_categories": false,
|
||||
"requires_details": true,
|
||||
"requires_rate_limit": false
|
||||
}
|
||||
],
|
||||
"wordpress": [
|
||||
{ "source_id": "beto.cd", "source_url": "https://beto.cd", "requires_rate_limit": true },
|
||||
{ "source_id": "newscd.net", "source_url": "https://newscd.net" },
|
||||
{ "source_id": "africanewsrdc.net", "source_url": "https://www.africanewsrdc.net" },
|
||||
{ "source_id": "angazainstitute.ac.cd", "source_url": "https://angazainstitute.ac.cd" },
|
||||
{ "source_id": "b-onetv.cd", "source_url": "https://b-onetv.cd" },
|
||||
{ "source_id": "bukavufm.com", "source_url": "https://bukavufm.com" },
|
||||
{ "source_id": "changement7.net", "source_url": "https://changement7.net" },
|
||||
{ "source_id": "congoactu.net", "source_url": "https://congoactu.net" },
|
||||
{ "source_id": "congoindependant.com", "source_url": "https://www.congoindependant.com" },
|
||||
{ "source_id": "congoquotidien.com", "source_url": "https://www.congoquotidien.com" },
|
||||
{ "source_id": "cumulard.cd", "source_url": "https://www.cumulard.cd" },
|
||||
{ "source_id": "environews-rdc.net", "source_url": "https://environews-rdc.net" },
|
||||
{ "source_id": "freemediardc.info", "source_url": "https://www.freemediardc.info" },
|
||||
{ "source_id": "geopolismagazine.org", "source_url": "https://geopolismagazine.org" },
|
||||
{ "source_id": "habarirdc.net", "source_url": "https://habarirdc.net" },
|
||||
{ "source_id": "infordc.com", "source_url": "https://infordc.com" },
|
||||
{ "source_id": "kilalopress.net", "source_url": "https://kilalopress.net" },
|
||||
{ "source_id": "laprosperiteonline.net", "source_url": "https://laprosperiteonline.net" },
|
||||
{ "source_id": "laprunellerdc.cd", "source_url": "https://laprunellerdc.cd" },
|
||||
{ "source_id": "lesmedias.net", "source_url": "https://lesmedias.net" },
|
||||
{ "source_id": "lesvolcansnews.net", "source_url": "https://lesvolcansnews.net" },
|
||||
{ "source_id": "netic-news.net", "source_url": "https://www.netic-news.net" },
|
||||
{ "source_id": "objectif-infos.cd", "source_url": "https://objectif-infos.cd" },
|
||||
{ "source_id": "scooprdc.net", "source_url": "https://scooprdc.net" },
|
||||
{ "source_id": "journaldekinshasa.com", "source_url": "https://www.journaldekinshasa.com" },
|
||||
{ "source_id": "lepotentiel.cd", "source_url": "https://lepotentiel.cd" },
|
||||
{ "source_id": "acturdc.com", "source_url": "https://acturdc.com" },
|
||||
{ "source_id": "matininfos.net", "source_url": "https://matininfos.net" }
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -7,7 +7,9 @@
|
||||
"types": "dist/index.d.ts",
|
||||
"scripts": {
|
||||
"build": "tsc -b",
|
||||
"test": "vitest --run"
|
||||
"test": "vitest --run",
|
||||
"queue": "bun run src/scripts/queue.ts",
|
||||
"worker": "bun run src/scripts/worker.ts"
|
||||
},
|
||||
"dependencies": {
|
||||
"@basango/logger": "workspace:*",
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
|
||||
import { logger } from "@basango/logger";
|
||||
|
||||
import {
|
||||
PipelineConfig,
|
||||
PipelineConfigSchema,
|
||||
@@ -16,8 +18,8 @@ export interface LoadConfigOptions {
|
||||
}
|
||||
|
||||
const DEFAULT_CONFIG_FILES = [
|
||||
path.join(process.cwd(), "config", "pipeline.json"),
|
||||
path.join(process.cwd(), "pipeline.json"),
|
||||
path.join(process.cwd(), "config", "pipeline.json"),
|
||||
path.join(process.cwd(), "pipeline.json"),
|
||||
];
|
||||
|
||||
const readJsonFile = (filePath: string): unknown => {
|
||||
@@ -25,10 +27,10 @@ const readJsonFile = (filePath: string): unknown => {
|
||||
return contents.trim() === "" ? {} : JSON.parse(contents);
|
||||
};
|
||||
|
||||
const locateConfigFile = (explicit?: string): string => {
|
||||
if (explicit && fs.existsSync(explicit)) {
|
||||
return explicit;
|
||||
}
|
||||
export const locateConfigFile = (explicit?: string): string => {
|
||||
if (explicit && fs.existsSync(explicit)) {
|
||||
return explicit;
|
||||
}
|
||||
|
||||
for (const candidate of DEFAULT_CONFIG_FILES) {
|
||||
if (fs.existsSync(candidate)) {
|
||||
@@ -81,11 +83,104 @@ export const loadConfig = (options: LoadConfigOptions = {}): PipelineConfig => {
|
||||
};
|
||||
|
||||
export const dumpConfig = (
|
||||
config: PipelineConfig,
|
||||
targetPath?: string,
|
||||
config: PipelineConfig,
|
||||
targetPath?: string,
|
||||
): void => {
|
||||
const destination = targetPath ?? locateConfigFile();
|
||||
const normalized = PipelineConfigSchema.parse(config);
|
||||
fs.mkdirSync(path.dirname(destination), { recursive: true });
|
||||
fs.writeFileSync(destination, JSON.stringify(normalized, null, 2));
|
||||
const destination = targetPath ?? locateConfigFile();
|
||||
const normalized = PipelineConfigSchema.parse(config);
|
||||
fs.mkdirSync(path.dirname(destination), { recursive: true });
|
||||
fs.writeFileSync(destination, JSON.stringify(normalized, null, 2));
|
||||
};
|
||||
|
||||
export interface PipelineConfigManagerOptions {
|
||||
configPath?: string;
|
||||
env?: string;
|
||||
autoLoad?: boolean;
|
||||
}
|
||||
|
||||
export class PipelineConfigManager {
|
||||
private readonly explicitPath?: string;
|
||||
|
||||
private readonly defaultEnv: string;
|
||||
|
||||
private cache?: PipelineConfig;
|
||||
|
||||
constructor(options: PipelineConfigManagerOptions = {}) {
|
||||
this.explicitPath = options.configPath;
|
||||
this.defaultEnv = options.env ?? "development";
|
||||
|
||||
if (options.autoLoad !== false) {
|
||||
this.cache = loadConfig({
|
||||
configPath: this.explicitPath,
|
||||
env: this.defaultEnv,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
get(env?: string): PipelineConfig {
|
||||
const resolvedEnv = env ?? this.defaultEnv;
|
||||
|
||||
if (resolvedEnv !== this.defaultEnv) {
|
||||
return loadConfig({
|
||||
configPath: this.explicitPath,
|
||||
env: resolvedEnv,
|
||||
});
|
||||
}
|
||||
|
||||
if (!this.cache) {
|
||||
this.cache = loadConfig({
|
||||
configPath: this.explicitPath,
|
||||
env: resolvedEnv,
|
||||
});
|
||||
}
|
||||
|
||||
return this.cache;
|
||||
}
|
||||
|
||||
reload(env?: string): PipelineConfig {
|
||||
const resolvedEnv = env ?? this.defaultEnv;
|
||||
const config = loadConfig({
|
||||
configPath: this.explicitPath,
|
||||
env: resolvedEnv,
|
||||
});
|
||||
|
||||
if (resolvedEnv === this.defaultEnv) {
|
||||
this.cache = config;
|
||||
}
|
||||
|
||||
return config;
|
||||
}
|
||||
|
||||
ensureDirectories(config?: PipelineConfig): PipelineConfig {
|
||||
const pipeline = config ?? this.get();
|
||||
ensureDirectories(pipeline.paths);
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
setupLogging(config?: PipelineConfig): void {
|
||||
const pipeline = config ?? this.get();
|
||||
this.ensureDirectories(pipeline);
|
||||
|
||||
const level = pipeline.logging.level.toLowerCase();
|
||||
process.env.LOG_LEVEL = level;
|
||||
const normalizedLevel = level as typeof logger.level;
|
||||
logger.level = normalizedLevel;
|
||||
|
||||
if (pipeline.logging.file_logging) {
|
||||
const logDir = pipeline.paths.logs;
|
||||
const destination = path.join(
|
||||
logDir,
|
||||
pipeline.logging.log_file,
|
||||
);
|
||||
fs.mkdirSync(path.dirname(destination), { recursive: true });
|
||||
if (!fs.existsSync(destination)) {
|
||||
fs.writeFileSync(destination, "");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resolveConfigPath(env?: string): string {
|
||||
const base = locateConfigFile(this.explicitPath);
|
||||
return resolveConfigPath(base, env ?? this.defaultEnv);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
import { parseArgs } from "node:util";
|
||||
|
||||
import { logger } from "@basango/logger";
|
||||
|
||||
import { PipelineConfigManager } from "@crawler/config";
|
||||
import { scheduleAsyncCrawl } from "@crawler/services/crawler";
|
||||
import { createQueueSettings } from "@crawler/services/crawler/async/queue";
|
||||
|
||||
interface QueueCliOptions {
|
||||
"source-id"?: string;
|
||||
env: string;
|
||||
"page-range"?: string;
|
||||
"date-range"?: string;
|
||||
category?: string;
|
||||
"redis-url"?: string;
|
||||
help?: boolean;
|
||||
}
|
||||
|
||||
const usage = `Usage: bun run src/scripts/queue.ts -- --source-id <id> [options]\n\nOptions:\n --env <env> Environment to load (default: development)\n --page-range <range> Optional page range filter (e.g. 1:5)\n --date-range <range> Optional date range filter (e.g. 2024-01-01:2024-01-31)\n --category <slug> Optional category to crawl\n --redis-url <url> Override Redis connection URL\n -h, --help Show this message`;
|
||||
|
||||
const parseCliArgs = (): QueueCliOptions => {
|
||||
const { values } = parseArgs({
|
||||
options: {
|
||||
"source-id": { type: "string" },
|
||||
env: { type: "string", default: "development" },
|
||||
"page-range": { type: "string" },
|
||||
"date-range": { type: "string" },
|
||||
category: { type: "string" },
|
||||
"redis-url": { type: "string" },
|
||||
help: { type: "boolean", short: "h" },
|
||||
},
|
||||
});
|
||||
|
||||
return values as QueueCliOptions;
|
||||
};
|
||||
|
||||
const main = async (): Promise<void> => {
|
||||
const options = parseCliArgs();
|
||||
|
||||
if (options.help || !options["source-id"]) {
|
||||
console.log(usage);
|
||||
if (!options["source-id"]) {
|
||||
process.exitCode = 1;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const env = options.env ?? "development";
|
||||
const manager = new PipelineConfigManager({ env });
|
||||
const config = manager.ensureDirectories();
|
||||
manager.setupLogging(config);
|
||||
|
||||
const settings = options["redis-url"]
|
||||
? createQueueSettings({ redis_url: options["redis-url"] })
|
||||
: undefined;
|
||||
|
||||
try {
|
||||
const jobId = await scheduleAsyncCrawl({
|
||||
sourceId: options["source-id"],
|
||||
env,
|
||||
pageRange: options["page-range"] ?? null,
|
||||
dateRange: options["date-range"] ?? null,
|
||||
category: options.category ?? null,
|
||||
settings,
|
||||
});
|
||||
|
||||
logger.info(
|
||||
{
|
||||
jobId,
|
||||
sourceId: options["source-id"],
|
||||
env,
|
||||
},
|
||||
"Scheduled asynchronous crawl job",
|
||||
);
|
||||
console.log(
|
||||
`Scheduled async crawl job ${jobId} for source '${options["source-id"]}' (env=${env})`,
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
error instanceof Error ? error : { error },
|
||||
"Failed to schedule crawl job",
|
||||
);
|
||||
console.error(`Failed to schedule crawl job: ${(error as Error).message}`);
|
||||
process.exitCode = 1;
|
||||
}
|
||||
};
|
||||
|
||||
void main();
|
||||
@@ -0,0 +1,112 @@
|
||||
import { parseArgs } from "node:util";
|
||||
|
||||
import { logger } from "@basango/logger";
|
||||
|
||||
import { PipelineConfigManager } from "@crawler/config";
|
||||
import { createQueueManager, createQueueSettings } from "@crawler/services/crawler/async/queue";
|
||||
import { startWorker } from "@crawler/services/crawler/async/worker";
|
||||
|
||||
interface WorkerCliOptions {
|
||||
env: string;
|
||||
queue?: string[];
|
||||
concurrency?: string;
|
||||
"redis-url"?: string;
|
||||
help?: boolean;
|
||||
}
|
||||
|
||||
const usage = `Usage: bun run src/scripts/worker.ts [options]\n\nOptions:\n --env <env> Environment to load (default: development)\n -q, --queue <name> Queue name to listen on (repeatable)\n --concurrency <number> Number of concurrent jobs per worker\n --redis-url <url> Override Redis connection URL\n -h, --help Show this message`;
|
||||
|
||||
const parseCliArgs = (): WorkerCliOptions => {
|
||||
const { values } = parseArgs({
|
||||
options: {
|
||||
env: { type: "string", default: "development" },
|
||||
queue: { type: "string", multiple: true, short: "q" },
|
||||
concurrency: { type: "string" },
|
||||
"redis-url": { type: "string" },
|
||||
help: { type: "boolean", short: "h" },
|
||||
},
|
||||
});
|
||||
|
||||
return values as WorkerCliOptions;
|
||||
};
|
||||
|
||||
const parseConcurrency = (value?: string): number | undefined => {
|
||||
if (!value) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const parsed = Number.parseInt(value, 10);
|
||||
if (Number.isNaN(parsed) || parsed <= 0) {
|
||||
throw new Error(`Invalid concurrency value: ${value}`);
|
||||
}
|
||||
|
||||
return parsed;
|
||||
};
|
||||
|
||||
const main = async (): Promise<void> => {
|
||||
const options = parseCliArgs();
|
||||
|
||||
if (options.help) {
|
||||
console.log(usage);
|
||||
return;
|
||||
}
|
||||
|
||||
const env = options.env ?? "development";
|
||||
const manager = new PipelineConfigManager({ env });
|
||||
const config = manager.ensureDirectories();
|
||||
manager.setupLogging(config);
|
||||
|
||||
let concurrency: number | undefined;
|
||||
try {
|
||||
concurrency = parseConcurrency(options.concurrency);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
error instanceof Error ? error : { error },
|
||||
"Invalid concurrency value provided",
|
||||
);
|
||||
process.exitCode = 1;
|
||||
return;
|
||||
}
|
||||
const settings = options["redis-url"]
|
||||
? createQueueSettings({ redis_url: options["redis-url"] })
|
||||
: undefined;
|
||||
const queueManager = createQueueManager({ settings });
|
||||
|
||||
const queueNames = options.queue?.length
|
||||
? options.queue.map((name) => queueManager.queueName(name))
|
||||
: undefined;
|
||||
|
||||
const handle = startWorker({
|
||||
queueManager,
|
||||
queueNames,
|
||||
concurrency,
|
||||
});
|
||||
|
||||
const shutdown = async (signal: NodeJS.Signals) => {
|
||||
logger.info({ signal }, "Received shutdown signal, draining workers");
|
||||
try {
|
||||
await handle.close();
|
||||
} finally {
|
||||
await queueManager.close();
|
||||
process.exit(0);
|
||||
}
|
||||
};
|
||||
|
||||
process.once("SIGINT", (signal) => {
|
||||
void shutdown(signal);
|
||||
});
|
||||
process.once("SIGTERM", (signal) => {
|
||||
void shutdown(signal);
|
||||
});
|
||||
|
||||
logger.info(
|
||||
{
|
||||
env,
|
||||
queueNames: queueNames ?? queueManager.iterQueueNames(),
|
||||
concurrency: concurrency ?? "default",
|
||||
},
|
||||
"Crawler workers started",
|
||||
);
|
||||
};
|
||||
|
||||
void main();
|
||||
@@ -3,7 +3,7 @@ import fs from "node:fs";
|
||||
import type { RedisOptions } from "ioredis";
|
||||
import { get_encoding } from "tiktoken";
|
||||
|
||||
import type { ProjectPaths } from "@/schema";
|
||||
import type { ProjectPaths } from "@crawler/schema";
|
||||
|
||||
export const ensureDirectories = (paths: ProjectPaths): void => {
|
||||
for (const dir of [paths.data, paths.logs, paths.configs]) {
|
||||
|
||||
@@ -4,8 +4,8 @@
|
||||
"rootDir": "src",
|
||||
"outDir": "dist",
|
||||
"paths": {
|
||||
"@basango/crawler": ["./src/index.ts"],
|
||||
"@basango/crawler/*": ["./src/*"]
|
||||
"@crawler": ["./src/index.ts"],
|
||||
"@crawler/*": ["./src/*"]
|
||||
}
|
||||
},
|
||||
"include": ["src"],
|
||||
|
||||
Reference in New Issue
Block a user