diff --git a/basango/apps/crawler/config/pipeline.json b/basango/apps/crawler/config/pipeline.json new file mode 100644 index 0000000..1182893 --- /dev/null +++ b/basango/apps/crawler/config/pipeline.json @@ -0,0 +1,152 @@ +{ + "fetch": { + "client": { + "timeout": 20, + "user_agent": "Basango/0.1 (+https://github.com/bernard-ng/basango)", + "follow_redirects": true, + "verify_ssl": true, + "rotate": true, + "max_retries": 3, + "backoff_initial": 1, + "backoff_multiplier": 2, + "backoff_max": 30, + "respect_retry_after": true + }, + "crawler": { + "notify": false, + "use_multi_threading": false, + "max_workers": 5 + } + }, + "logging": { + "level": "INFO", + "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + "file_logging": true, + "console_logging": true, + "log_file": "pipeline.log", + "max_log_size": 10485760, + "backup_count": 5 + }, + "sources": { + "html": [ + { + "source_id": "radiookapi.net", + "source_url": "https://www.radiookapi.net", + "source_date": { + "pattern": "/(\\d{2})/(\\d{2})/(\\d{4}) - (\\d{2}:\\d{2})/", + "replacement": "$3-$2-$1 $4" + }, + "source_selectors": { + "articles": ".view-content > .views-row.content-row", + "article_title": "h1.page-header", + "article_link": ".views-field-title a", + "article_body": ".field-name-body", + "article_date": ".views-field-created", + "article_categories": ".views-field-field-cat-gorie a", + "pagination": "ul.pagination > li.pager-last > a" + }, + "pagination_template": "actualite", + "supports_categories": false, + "requires_details": true, + "requires_rate_limit": false + }, + { + "source_id": "7sur7.cd", + "source_url": "https://7sur7.cd", + "source_date": { + "pattern": "/\\w{3} (\\d{2})/(\\d{2})/(\\d{4}) - (\\d{2}:\\d{2})/", + "replacement": "$3-$2-$1 $4" + }, + "categories": [ + "politique", + "economie", + "culture", + "sport", + "societe" + ], + "source_selectors": { + "articles": ".view-content > .row.views-row", + "article_title": ".views-field-title a", + "article_link": ".views-field-title a", + "article_body": ".field.field--name-body", + "article_date": ".views-field-created", + "pagination": "ul.pagination > li.pager__item.pager__item--last > a" + }, + "pagination_template": "index.php/category/{category}", + "supports_categories": true, + "requires_details": false, + "requires_rate_limit": false + }, + { + "source_id": "mediacongo.net", + "source_url": "https://www.mediacongo.net", + "source_date": { + "format": "%d.%m.%Y %H:%M" + }, + "source_selectors": { + "articles": ".for_aitems > .article_other_item", + "article_title": "img", + "article_link": "a:first-child", + "article_categories": "a.color_link", + "article_body": ".article_ttext", + "article_date": ".article_other_about", + "pagination": "div.pagination > div > a:last-child" + }, + "pagination_template": "articles.html", + "supports_categories": false, + "requires_details": true, + "requires_rate_limit": false + }, + { + "source_id": "actualite.cd", + "source_url": "https://actualite.cd", + "source_date": { + "pattern": "/(\\d{1}) (\\d{1,2}) (\\d{2}) (\\d{4}) - (\\d{2}:\\d{2})/", + "replacement": "$4-$3-$2 $5" + }, + "source_selectors": { + "articles": "#views-bootstrap-taxonomy-term-page-2 > div > div", + "article_title": "#actu-titre a", + "article_link": "#actu-titre a", + "article_categories": "#actu-cat a", + "article_body": ".views-field.views-field-body", + "article_date": "#p-date" + }, + "pagination_template": "actualite", + "supports_categories": false, + "requires_details": true, + "requires_rate_limit": false + } + ], + "wordpress": [ + { "source_id": "beto.cd", "source_url": "https://beto.cd", "requires_rate_limit": true }, + { "source_id": "newscd.net", "source_url": "https://newscd.net" }, + { "source_id": "africanewsrdc.net", "source_url": "https://www.africanewsrdc.net" }, + { "source_id": "angazainstitute.ac.cd", "source_url": "https://angazainstitute.ac.cd" }, + { "source_id": "b-onetv.cd", "source_url": "https://b-onetv.cd" }, + { "source_id": "bukavufm.com", "source_url": "https://bukavufm.com" }, + { "source_id": "changement7.net", "source_url": "https://changement7.net" }, + { "source_id": "congoactu.net", "source_url": "https://congoactu.net" }, + { "source_id": "congoindependant.com", "source_url": "https://www.congoindependant.com" }, + { "source_id": "congoquotidien.com", "source_url": "https://www.congoquotidien.com" }, + { "source_id": "cumulard.cd", "source_url": "https://www.cumulard.cd" }, + { "source_id": "environews-rdc.net", "source_url": "https://environews-rdc.net" }, + { "source_id": "freemediardc.info", "source_url": "https://www.freemediardc.info" }, + { "source_id": "geopolismagazine.org", "source_url": "https://geopolismagazine.org" }, + { "source_id": "habarirdc.net", "source_url": "https://habarirdc.net" }, + { "source_id": "infordc.com", "source_url": "https://infordc.com" }, + { "source_id": "kilalopress.net", "source_url": "https://kilalopress.net" }, + { "source_id": "laprosperiteonline.net", "source_url": "https://laprosperiteonline.net" }, + { "source_id": "laprunellerdc.cd", "source_url": "https://laprunellerdc.cd" }, + { "source_id": "lesmedias.net", "source_url": "https://lesmedias.net" }, + { "source_id": "lesvolcansnews.net", "source_url": "https://lesvolcansnews.net" }, + { "source_id": "netic-news.net", "source_url": "https://www.netic-news.net" }, + { "source_id": "objectif-infos.cd", "source_url": "https://objectif-infos.cd" }, + { "source_id": "scooprdc.net", "source_url": "https://scooprdc.net" }, + { "source_id": "journaldekinshasa.com", "source_url": "https://www.journaldekinshasa.com" }, + { "source_id": "lepotentiel.cd", "source_url": "https://lepotentiel.cd" }, + { "source_id": "acturdc.com", "source_url": "https://acturdc.com" }, + { "source_id": "matininfos.net", "source_url": "https://matininfos.net" } + ] + } +} diff --git a/basango/apps/crawler/config/pipeline.prod.json b/basango/apps/crawler/config/pipeline.prod.json new file mode 100644 index 0000000..f7de292 --- /dev/null +++ b/basango/apps/crawler/config/pipeline.prod.json @@ -0,0 +1,152 @@ +{ + "fetch": { + "client": { + "timeout": 20, + "user_agent": "Basango/0.1 (+https://github.com/bernard-ng/basango)", + "follow_redirects": true, + "verify_ssl": true, + "rotate": true, + "max_retries": 3, + "backoff_initial": 1, + "backoff_multiplier": 2, + "backoff_max": 30, + "respect_retry_after": true + }, + "crawler": { + "notify": false, + "use_multi_threading": false, + "max_workers": 5 + } + }, + "logging": { + "level": "ERROR", + "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + "file_logging": true, + "console_logging": true, + "log_file": "pipeline.log", + "max_log_size": 10485760, + "backup_count": 5 + }, + "sources": { + "html": [ + { + "source_id": "radiookapi.net", + "source_url": "https://www.radiookapi.net", + "source_date": { + "pattern": "/(\\d{2})/(\\d{2})/(\\d{4}) - (\\d{2}:\\d{2})/", + "replacement": "$3-$2-$1 $4" + }, + "source_selectors": { + "articles": ".view-content > .views-row.content-row", + "article_title": ".views-field-title a", + "article_link": ".views-field-title a", + "article_body": ".field-name-body", + "article_date": ".views-field-created", + "article_categories": ".views-field-field-cat-gorie a", + "pagination": "ul.pagination > li a(:last-child)" + }, + "pagination_template": "/actualite?page={page}", + "supports_categories": false, + "requires_details": false, + "requires_rate_limit": false + }, + { + "source_id": "7sur7.cd", + "source_url": "https://7sur7.cd", + "source_date": { + "pattern": "/\\w{3} (\\d{2})/(\\d{2})/(\\d{4}) - (\\d{2}:\\d{2})/", + "replacement": "$3-$2-$1 $4" + }, + "categories": [ + "politique", + "economie", + "culture", + "sport", + "societe" + ], + "source_selectors": { + "articles": ".view-content > .row.views-row", + "article_title": ".views-field-title a", + "article_link": ".views-field-title a", + "article_body": ".field.field--name-body", + "article_date": ".views-field-created", + "pagination": "ul.pagination > li a(:last-child)" + }, + "pagination_template": "/index.php/category/{category}?page={page}", + "supports_categories": true, + "requires_details": false, + "requires_rate_limit": false + }, + { + "source_id": "mediacongo.net", + "source_url": "https://mediacongo.net", + "source_date": { + "format": "%d.%m.%Y %H:%M" + }, + "source_selectors": { + "articles": ".for_aitems > .article_other_item", + "article_title": "img", + "article_link": "a(:first-child)", + "article_categories": "a.color_link", + "article_body": ".article_ttext", + "article_date": ".article_other_about", + "pagination": ".nav > a(:last-child)" + }, + "pagination_template": "/articles.html?page={page}", + "supports_categories": false, + "requires_details": true, + "requires_rate_limit": false + }, + { + "source_id": "actualite.cd", + "source_url": "https://actualite.cd", + "source_date": { + "pattern": "/(\\d{1}) (\\d{1,2}) (\\d{2}) (\\d{4}) - (\\d{2}:\\d{2})/", + "replacement": "$4-$3-$2 $5" + }, + "source_selectors": { + "articles": "#views-bootstrap-taxonomy-term-page-2 > div > div", + "article_title": "#actu-titre a", + "article_link": "#actu-titre a", + "article_categories": "#actu-cat a", + "article_body": ".views-field.views-field-body", + "article_date": "#p-date" + }, + "pagination_template": "/actualite?page={page}", + "supports_categories": false, + "requires_details": true, + "requires_rate_limit": false + } + ], + "wordpress": [ + { "source_id": "beto.cd", "source_url": "https://beto.cd", "requires_rate_limit": true }, + { "source_id": "newscd.net", "source_url": "https://newscd.net" }, + { "source_id": "africanewsrdc.net", "source_url": "https://www.africanewsrdc.net" }, + { "source_id": "angazainstitute.ac.cd", "source_url": "https://angazainstitute.ac.cd" }, + { "source_id": "b-onetv.cd", "source_url": "https://b-onetv.cd" }, + { "source_id": "bukavufm.com", "source_url": "https://bukavufm.com" }, + { "source_id": "changement7.net", "source_url": "https://changement7.net" }, + { "source_id": "congoactu.net", "source_url": "https://congoactu.net" }, + { "source_id": "congoindependant.com", "source_url": "https://www.congoindependant.com" }, + { "source_id": "congoquotidien.com", "source_url": "https://www.congoquotidien.com" }, + { "source_id": "cumulard.cd", "source_url": "https://www.cumulard.cd" }, + { "source_id": "environews-rdc.net", "source_url": "https://environews-rdc.net" }, + { "source_id": "freemediardc.info", "source_url": "https://www.freemediardc.info" }, + { "source_id": "geopolismagazine.org", "source_url": "https://geopolismagazine.org" }, + { "source_id": "habarirdc.net", "source_url": "https://habarirdc.net" }, + { "source_id": "infordc.com", "source_url": "https://infordc.com" }, + { "source_id": "kilalopress.net", "source_url": "https://kilalopress.net" }, + { "source_id": "laprosperiteonline.net", "source_url": "https://laprosperiteonline.net" }, + { "source_id": "laprunellerdc.cd", "source_url": "https://laprunellerdc.cd" }, + { "source_id": "lesmedias.net", "source_url": "https://lesmedias.net" }, + { "source_id": "lesvolcansnews.net", "source_url": "https://lesvolcansnews.net" }, + { "source_id": "netic-news.net", "source_url": "https://www.netic-news.net" }, + { "source_id": "objectif-infos.cd", "source_url": "https://objectif-infos.cd" }, + { "source_id": "scooprdc.net", "source_url": "https://scooprdc.net" }, + { "source_id": "journaldekinshasa.com", "source_url": "https://www.journaldekinshasa.com" }, + { "source_id": "lepotentiel.cd", "source_url": "https://lepotentiel.cd" }, + { "source_id": "acturdc.com", "source_url": "https://acturdc.com" }, + { "source_id": "matininfos.net", "source_url": "https://matininfos.net" } + ] + } +} diff --git a/basango/apps/crawler/package.json b/basango/apps/crawler/package.json index 4736e0a..3b1d16c 100644 --- a/basango/apps/crawler/package.json +++ b/basango/apps/crawler/package.json @@ -7,7 +7,9 @@ "types": "dist/index.d.ts", "scripts": { "build": "tsc -b", - "test": "vitest --run" + "test": "vitest --run", + "queue": "bun run src/scripts/queue.ts", + "worker": "bun run src/scripts/worker.ts" }, "dependencies": { "@basango/logger": "workspace:*", diff --git a/basango/apps/crawler/src/config.ts b/basango/apps/crawler/src/config.ts index a0f4147..0409949 100644 --- a/basango/apps/crawler/src/config.ts +++ b/basango/apps/crawler/src/config.ts @@ -1,6 +1,8 @@ import fs from "node:fs"; import path from "node:path"; +import { logger } from "@basango/logger"; + import { PipelineConfig, PipelineConfigSchema, @@ -16,8 +18,8 @@ export interface LoadConfigOptions { } const DEFAULT_CONFIG_FILES = [ - path.join(process.cwd(), "config", "pipeline.json"), - path.join(process.cwd(), "pipeline.json"), + path.join(process.cwd(), "config", "pipeline.json"), + path.join(process.cwd(), "pipeline.json"), ]; const readJsonFile = (filePath: string): unknown => { @@ -25,10 +27,10 @@ const readJsonFile = (filePath: string): unknown => { return contents.trim() === "" ? {} : JSON.parse(contents); }; -const locateConfigFile = (explicit?: string): string => { - if (explicit && fs.existsSync(explicit)) { - return explicit; - } +export const locateConfigFile = (explicit?: string): string => { + if (explicit && fs.existsSync(explicit)) { + return explicit; + } for (const candidate of DEFAULT_CONFIG_FILES) { if (fs.existsSync(candidate)) { @@ -81,11 +83,104 @@ export const loadConfig = (options: LoadConfigOptions = {}): PipelineConfig => { }; export const dumpConfig = ( - config: PipelineConfig, - targetPath?: string, + config: PipelineConfig, + targetPath?: string, ): void => { - const destination = targetPath ?? locateConfigFile(); - const normalized = PipelineConfigSchema.parse(config); - fs.mkdirSync(path.dirname(destination), { recursive: true }); - fs.writeFileSync(destination, JSON.stringify(normalized, null, 2)); + const destination = targetPath ?? locateConfigFile(); + const normalized = PipelineConfigSchema.parse(config); + fs.mkdirSync(path.dirname(destination), { recursive: true }); + fs.writeFileSync(destination, JSON.stringify(normalized, null, 2)); }; + +export interface PipelineConfigManagerOptions { + configPath?: string; + env?: string; + autoLoad?: boolean; +} + +export class PipelineConfigManager { + private readonly explicitPath?: string; + + private readonly defaultEnv: string; + + private cache?: PipelineConfig; + + constructor(options: PipelineConfigManagerOptions = {}) { + this.explicitPath = options.configPath; + this.defaultEnv = options.env ?? "development"; + + if (options.autoLoad !== false) { + this.cache = loadConfig({ + configPath: this.explicitPath, + env: this.defaultEnv, + }); + } + } + + get(env?: string): PipelineConfig { + const resolvedEnv = env ?? this.defaultEnv; + + if (resolvedEnv !== this.defaultEnv) { + return loadConfig({ + configPath: this.explicitPath, + env: resolvedEnv, + }); + } + + if (!this.cache) { + this.cache = loadConfig({ + configPath: this.explicitPath, + env: resolvedEnv, + }); + } + + return this.cache; + } + + reload(env?: string): PipelineConfig { + const resolvedEnv = env ?? this.defaultEnv; + const config = loadConfig({ + configPath: this.explicitPath, + env: resolvedEnv, + }); + + if (resolvedEnv === this.defaultEnv) { + this.cache = config; + } + + return config; + } + + ensureDirectories(config?: PipelineConfig): PipelineConfig { + const pipeline = config ?? this.get(); + ensureDirectories(pipeline.paths); + return pipeline; + } + + setupLogging(config?: PipelineConfig): void { + const pipeline = config ?? this.get(); + this.ensureDirectories(pipeline); + + const level = pipeline.logging.level.toLowerCase(); + process.env.LOG_LEVEL = level; + const normalizedLevel = level as typeof logger.level; + logger.level = normalizedLevel; + + if (pipeline.logging.file_logging) { + const logDir = pipeline.paths.logs; + const destination = path.join( + logDir, + pipeline.logging.log_file, + ); + fs.mkdirSync(path.dirname(destination), { recursive: true }); + if (!fs.existsSync(destination)) { + fs.writeFileSync(destination, ""); + } + } + } + + resolveConfigPath(env?: string): string { + const base = locateConfigFile(this.explicitPath); + return resolveConfigPath(base, env ?? this.defaultEnv); + } +} diff --git a/basango/apps/crawler/src/scripts/queue.ts b/basango/apps/crawler/src/scripts/queue.ts new file mode 100644 index 0000000..14e9767 --- /dev/null +++ b/basango/apps/crawler/src/scripts/queue.ts @@ -0,0 +1,88 @@ +import { parseArgs } from "node:util"; + +import { logger } from "@basango/logger"; + +import { PipelineConfigManager } from "@crawler/config"; +import { scheduleAsyncCrawl } from "@crawler/services/crawler"; +import { createQueueSettings } from "@crawler/services/crawler/async/queue"; + +interface QueueCliOptions { + "source-id"?: string; + env: string; + "page-range"?: string; + "date-range"?: string; + category?: string; + "redis-url"?: string; + help?: boolean; +} + +const usage = `Usage: bun run src/scripts/queue.ts -- --source-id [options]\n\nOptions:\n --env Environment to load (default: development)\n --page-range Optional page range filter (e.g. 1:5)\n --date-range Optional date range filter (e.g. 2024-01-01:2024-01-31)\n --category Optional category to crawl\n --redis-url Override Redis connection URL\n -h, --help Show this message`; + +const parseCliArgs = (): QueueCliOptions => { + const { values } = parseArgs({ + options: { + "source-id": { type: "string" }, + env: { type: "string", default: "development" }, + "page-range": { type: "string" }, + "date-range": { type: "string" }, + category: { type: "string" }, + "redis-url": { type: "string" }, + help: { type: "boolean", short: "h" }, + }, + }); + + return values as QueueCliOptions; +}; + +const main = async (): Promise => { + const options = parseCliArgs(); + + if (options.help || !options["source-id"]) { + console.log(usage); + if (!options["source-id"]) { + process.exitCode = 1; + } + return; + } + + const env = options.env ?? "development"; + const manager = new PipelineConfigManager({ env }); + const config = manager.ensureDirectories(); + manager.setupLogging(config); + + const settings = options["redis-url"] + ? createQueueSettings({ redis_url: options["redis-url"] }) + : undefined; + + try { + const jobId = await scheduleAsyncCrawl({ + sourceId: options["source-id"], + env, + pageRange: options["page-range"] ?? null, + dateRange: options["date-range"] ?? null, + category: options.category ?? null, + settings, + }); + + logger.info( + { + jobId, + sourceId: options["source-id"], + env, + }, + "Scheduled asynchronous crawl job", + ); + console.log( + `Scheduled async crawl job ${jobId} for source '${options["source-id"]}' (env=${env})`, + ); + } catch (error) { + logger.error( + error instanceof Error ? error : { error }, + "Failed to schedule crawl job", + ); + console.error(`Failed to schedule crawl job: ${(error as Error).message}`); + process.exitCode = 1; + } +}; + +void main(); diff --git a/basango/apps/crawler/src/scripts/worker.ts b/basango/apps/crawler/src/scripts/worker.ts new file mode 100644 index 0000000..b038021 --- /dev/null +++ b/basango/apps/crawler/src/scripts/worker.ts @@ -0,0 +1,112 @@ +import { parseArgs } from "node:util"; + +import { logger } from "@basango/logger"; + +import { PipelineConfigManager } from "@crawler/config"; +import { createQueueManager, createQueueSettings } from "@crawler/services/crawler/async/queue"; +import { startWorker } from "@crawler/services/crawler/async/worker"; + +interface WorkerCliOptions { + env: string; + queue?: string[]; + concurrency?: string; + "redis-url"?: string; + help?: boolean; +} + +const usage = `Usage: bun run src/scripts/worker.ts [options]\n\nOptions:\n --env Environment to load (default: development)\n -q, --queue Queue name to listen on (repeatable)\n --concurrency Number of concurrent jobs per worker\n --redis-url Override Redis connection URL\n -h, --help Show this message`; + +const parseCliArgs = (): WorkerCliOptions => { + const { values } = parseArgs({ + options: { + env: { type: "string", default: "development" }, + queue: { type: "string", multiple: true, short: "q" }, + concurrency: { type: "string" }, + "redis-url": { type: "string" }, + help: { type: "boolean", short: "h" }, + }, + }); + + return values as WorkerCliOptions; +}; + +const parseConcurrency = (value?: string): number | undefined => { + if (!value) { + return undefined; + } + + const parsed = Number.parseInt(value, 10); + if (Number.isNaN(parsed) || parsed <= 0) { + throw new Error(`Invalid concurrency value: ${value}`); + } + + return parsed; +}; + +const main = async (): Promise => { + const options = parseCliArgs(); + + if (options.help) { + console.log(usage); + return; + } + + const env = options.env ?? "development"; + const manager = new PipelineConfigManager({ env }); + const config = manager.ensureDirectories(); + manager.setupLogging(config); + + let concurrency: number | undefined; + try { + concurrency = parseConcurrency(options.concurrency); + } catch (error) { + logger.error( + error instanceof Error ? error : { error }, + "Invalid concurrency value provided", + ); + process.exitCode = 1; + return; + } + const settings = options["redis-url"] + ? createQueueSettings({ redis_url: options["redis-url"] }) + : undefined; + const queueManager = createQueueManager({ settings }); + + const queueNames = options.queue?.length + ? options.queue.map((name) => queueManager.queueName(name)) + : undefined; + + const handle = startWorker({ + queueManager, + queueNames, + concurrency, + }); + + const shutdown = async (signal: NodeJS.Signals) => { + logger.info({ signal }, "Received shutdown signal, draining workers"); + try { + await handle.close(); + } finally { + await queueManager.close(); + process.exit(0); + } + }; + + process.once("SIGINT", (signal) => { + void shutdown(signal); + }); + process.once("SIGTERM", (signal) => { + void shutdown(signal); + }); + + logger.info( + { + env, + queueNames: queueNames ?? queueManager.iterQueueNames(), + concurrency: concurrency ?? "default", + }, + "Crawler workers started", + ); +}; + +void main(); diff --git a/basango/apps/crawler/src/utils.ts b/basango/apps/crawler/src/utils.ts index f55c04d..6f9f544 100644 --- a/basango/apps/crawler/src/utils.ts +++ b/basango/apps/crawler/src/utils.ts @@ -3,7 +3,7 @@ import fs from "node:fs"; import type { RedisOptions } from "ioredis"; import { get_encoding } from "tiktoken"; -import type { ProjectPaths } from "@/schema"; +import type { ProjectPaths } from "@crawler/schema"; export const ensureDirectories = (paths: ProjectPaths): void => { for (const dir of [paths.data, paths.logs, paths.configs]) { diff --git a/basango/apps/crawler/tsconfig.json b/basango/apps/crawler/tsconfig.json index 319ce91..a11e79f 100644 --- a/basango/apps/crawler/tsconfig.json +++ b/basango/apps/crawler/tsconfig.json @@ -4,8 +4,8 @@ "rootDir": "src", "outDir": "dist", "paths": { - "@basango/crawler": ["./src/index.ts"], - "@basango/crawler/*": ["./src/*"] + "@crawler": ["./src/index.ts"], + "@crawler/*": ["./src/*"] } }, "include": ["src"],