diff --git a/basango/apps/crawler/package.json b/basango/apps/crawler/package.json index 87d3d0e..4736e0a 100644 --- a/basango/apps/crawler/package.json +++ b/basango/apps/crawler/package.json @@ -5,15 +5,16 @@ "type": "module", "main": "dist/index.js", "types": "dist/index.d.ts", - "scripts": { - "build": "tsc -b", - "test": "vitest --run" - }, - "dependencies": { - "bullmq": "^4.17.0", - "date-fns": "^3.6.0", - "ioredis": "^5.3.2", - "tiktoken": "^1.0.14", - "zod": "^4.0.0" + "scripts": { + "build": "tsc -b", + "test": "vitest --run" + }, + "dependencies": { + "@basango/logger": "workspace:*", + "bullmq": "^4.17.0", + "date-fns": "^3.6.0", + "ioredis": "^5.3.2", + "tiktoken": "^1.0.14", + "zod": "^4.0.0" } } diff --git a/basango/apps/crawler/src/__tests__/config.ts b/basango/apps/crawler/src/__tests__/config.ts new file mode 100644 index 0000000..0cd11e6 --- /dev/null +++ b/basango/apps/crawler/src/__tests__/config.ts @@ -0,0 +1 @@ +export * from "../config"; diff --git a/basango/apps/crawler/src/__tests__/queue.ts b/basango/apps/crawler/src/__tests__/queue.ts new file mode 100644 index 0000000..109e99a --- /dev/null +++ b/basango/apps/crawler/src/__tests__/queue.ts @@ -0,0 +1 @@ +export * from "../services/crawler/async/queue"; diff --git a/basango/apps/crawler/src/__tests__/schema.ts b/basango/apps/crawler/src/__tests__/schema.ts new file mode 100644 index 0000000..2f37b99 --- /dev/null +++ b/basango/apps/crawler/src/__tests__/schema.ts @@ -0,0 +1 @@ +export * from "../schema"; diff --git a/basango/apps/crawler/src/__tests__/tasks.ts b/basango/apps/crawler/src/__tests__/tasks.ts new file mode 100644 index 0000000..e90b719 --- /dev/null +++ b/basango/apps/crawler/src/__tests__/tasks.ts @@ -0,0 +1 @@ +export * from "../services/crawler/async/tasks"; diff --git a/basango/apps/crawler/src/schema.ts b/basango/apps/crawler/src/schema.ts index 060e493..fbef893 100644 --- a/basango/apps/crawler/src/schema.ts +++ b/basango/apps/crawler/src/schema.ts @@ -310,5 +310,27 @@ export const resolveConfigPath = (basePath: string, env?: string): string => { return `${withoutExt}.${env}${ext}`; }; -export const schemaToJSON = (schema: T) => - schema.toJSON(); +export const schemaToJSON = (schema: T): unknown => { + const candidate = schema as unknown as { toJSON?: () => unknown }; + if (typeof candidate.toJSON === "function") { + return candidate.toJSON(); + } + + const typeName = (schema as { _def?: { typeName?: z.ZodFirstPartyTypeKind } })._def + ?.typeName; + + switch (typeName) { + case z.ZodFirstPartyTypeKind.ZodObject: + return { type: "object" }; + case z.ZodFirstPartyTypeKind.ZodArray: + return { type: "array" }; + case z.ZodFirstPartyTypeKind.ZodString: + return { type: "string" }; + case z.ZodFirstPartyTypeKind.ZodNumber: + return { type: "number" }; + case z.ZodFirstPartyTypeKind.ZodBoolean: + return { type: "boolean" }; + default: + return { type: "unknown" }; + } +}; diff --git a/basango/apps/crawler/src/services/crawler/async/tasks.ts b/basango/apps/crawler/src/services/crawler/async/tasks.ts index df86e5d..cca05cd 100644 --- a/basango/apps/crawler/src/services/crawler/async/tasks.ts +++ b/basango/apps/crawler/src/services/crawler/async/tasks.ts @@ -1,10 +1,12 @@ +import { logger } from "@basango/logger"; + import { - ListingTaskPayloadSchema, - ArticleTaskPayloadSchema, - ProcessedTaskPayloadSchema, - ListingTaskPayload, - ArticleTaskPayload, - ProcessedTaskPayload, + ListingTaskPayloadSchema, + ArticleTaskPayloadSchema, + ProcessedTaskPayloadSchema, + ListingTaskPayload, + ArticleTaskPayload, + ProcessedTaskPayload, } from "./schemas"; import { createQueueManager, @@ -48,47 +50,122 @@ export interface ScheduleAsyncCrawlOptions { } export const scheduleAsyncCrawl = async ({ - sourceId, - env = "development", - pageRange, - dateRange, - category, - settings, - queueManager, + sourceId, + env = "development", + pageRange, + dateRange, + category, + settings, + queueManager, }: ScheduleAsyncCrawlOptions): Promise => { - const payload = ListingTaskPayloadSchema.parse({ - source_id: sourceId, - env, - page_range: pageRange ?? undefined, - date_range: dateRange ?? undefined, - category: category ?? undefined, - }); + const payload = ListingTaskPayloadSchema.parse({ + source_id: sourceId, + env, + page_range: pageRange ?? undefined, + date_range: dateRange ?? undefined, + category: category ?? undefined, + }); - const manager = queueManager ?? createQueueManager({ settings }); - try { - const job = await manager.enqueueListing(payload); - return job.id; - } finally { - if (!queueManager) { - await manager.close(); - } + const manager = queueManager ?? createQueueManager({ settings }); + logger.debug( + { + sourceId, + env: payload.env, + pageRange: payload.page_range, + dateRange: payload.date_range, + category: payload.category, + }, + "Scheduling listing collection job", + ); + try { + const job = await manager.enqueueListing(payload); + logger.info( + { jobId: job.id, sourceId, env: payload.env }, + "Scheduled listing collection job", + ); + return job.id; + } finally { + if (!queueManager) { + await manager.close(); + } } }; export const collectListing = async (payload: unknown): Promise => { - const data = ListingTaskPayloadSchema.parse(payload); - const result = await handlers.collectListing(data); - return typeof result === "number" ? result : 0; + const data = ListingTaskPayloadSchema.parse(payload); + logger.debug( + { + sourceId: data.source_id, + env: data.env, + pageRange: data.page_range, + dateRange: data.date_range, + category: data.category, + }, + "Collecting listing", + ); + + const result = await handlers.collectListing(data); + const count = typeof result === "number" ? result : 0; + + logger.info( + { + sourceId: data.source_id, + env: data.env, + queuedArticles: count, + }, + "Listing collection completed", + ); + + return count; }; export const collectArticle = async (payload: unknown): Promise => { - const data = ArticleTaskPayloadSchema.parse(payload); - return handlers.collectArticle(data); + const data = ArticleTaskPayloadSchema.parse(payload); + logger.debug( + { + sourceId: data.source_id, + env: data.env, + url: data.url, + page: data.page, + }, + "Collecting article", + ); + + const result = await handlers.collectArticle(data); + + logger.info( + { + sourceId: data.source_id, + env: data.env, + url: data.url, + }, + "Article collection completed", + ); + + return result; }; export const forwardForProcessing = async ( - payload: unknown, + payload: unknown, ): Promise => { - const data = ProcessedTaskPayloadSchema.parse(payload); - return handlers.forwardForProcessing(data); + const data = ProcessedTaskPayloadSchema.parse(payload); + logger.debug( + { + sourceId: data.source_id, + env: data.env, + }, + "Forwarding article for processing", + ); + + const result = await handlers.forwardForProcessing(data); + + logger.info( + { + sourceId: data.source_id, + env: data.env, + }, + "Article forwarded for processing", + ); + + return result; }; diff --git a/basango/apps/crawler/tsconfig.json b/basango/apps/crawler/tsconfig.json index f11a34a..319ce91 100644 --- a/basango/apps/crawler/tsconfig.json +++ b/basango/apps/crawler/tsconfig.json @@ -1,12 +1,13 @@ { - "extends": "@basango/tsconfig/base.json", - "compilerOptions": { - "rootDir": "src", - "outDir": "dist" - }, - "include": ["src"], - "references": [], - "paths": { - "@basango/crawler": ["src/**"] - } + "extends": "@basango/tsconfig/base.json", + "compilerOptions": { + "rootDir": "src", + "outDir": "dist", + "paths": { + "@basango/crawler": ["./src/index.ts"], + "@basango/crawler/*": ["./src/*"] + } + }, + "include": ["src"], + "references": [] }