feat(crawler): improve async tasks integration

This commit is contained in:
Bernard Ngandu
2025-10-31 23:30:24 +02:00
parent 113941077f
commit 69303cb075
8 changed files with 163 additions and 58 deletions
+1
View File
@@ -10,6 +10,7 @@
"test": "vitest --run" "test": "vitest --run"
}, },
"dependencies": { "dependencies": {
"@basango/logger": "workspace:*",
"bullmq": "^4.17.0", "bullmq": "^4.17.0",
"date-fns": "^3.6.0", "date-fns": "^3.6.0",
"ioredis": "^5.3.2", "ioredis": "^5.3.2",
@@ -0,0 +1 @@
export * from "../config";
@@ -0,0 +1 @@
export * from "../services/crawler/async/queue";
@@ -0,0 +1 @@
export * from "../schema";
@@ -0,0 +1 @@
export * from "../services/crawler/async/tasks";
+24 -2
View File
@@ -310,5 +310,27 @@ export const resolveConfigPath = (basePath: string, env?: string): string => {
return `${withoutExt}.${env}${ext}`; return `${withoutExt}.${env}${ext}`;
}; };
export const schemaToJSON = <T extends z.ZodTypeAny>(schema: T) => export const schemaToJSON = <T extends z.ZodTypeAny>(schema: T): unknown => {
schema.toJSON(); const candidate = schema as unknown as { toJSON?: () => unknown };
if (typeof candidate.toJSON === "function") {
return candidate.toJSON();
}
const typeName = (schema as { _def?: { typeName?: z.ZodFirstPartyTypeKind } })._def
?.typeName;
switch (typeName) {
case z.ZodFirstPartyTypeKind.ZodObject:
return { type: "object" };
case z.ZodFirstPartyTypeKind.ZodArray:
return { type: "array" };
case z.ZodFirstPartyTypeKind.ZodString:
return { type: "string" };
case z.ZodFirstPartyTypeKind.ZodNumber:
return { type: "number" };
case z.ZodFirstPartyTypeKind.ZodBoolean:
return { type: "boolean" };
default:
return { type: "unknown" };
}
};
@@ -1,3 +1,5 @@
import { logger } from "@basango/logger";
import { import {
ListingTaskPayloadSchema, ListingTaskPayloadSchema,
ArticleTaskPayloadSchema, ArticleTaskPayloadSchema,
@@ -65,8 +67,22 @@ export const scheduleAsyncCrawl = async ({
}); });
const manager = queueManager ?? createQueueManager({ settings }); const manager = queueManager ?? createQueueManager({ settings });
logger.debug(
{
sourceId,
env: payload.env,
pageRange: payload.page_range,
dateRange: payload.date_range,
category: payload.category,
},
"Scheduling listing collection job",
);
try { try {
const job = await manager.enqueueListing(payload); const job = await manager.enqueueListing(payload);
logger.info(
{ jobId: job.id, sourceId, env: payload.env },
"Scheduled listing collection job",
);
return job.id; return job.id;
} finally { } finally {
if (!queueManager) { if (!queueManager) {
@@ -77,18 +93,79 @@ export const scheduleAsyncCrawl = async ({
export const collectListing = async (payload: unknown): Promise<number> => { export const collectListing = async (payload: unknown): Promise<number> => {
const data = ListingTaskPayloadSchema.parse(payload); const data = ListingTaskPayloadSchema.parse(payload);
logger.debug(
{
sourceId: data.source_id,
env: data.env,
pageRange: data.page_range,
dateRange: data.date_range,
category: data.category,
},
"Collecting listing",
);
const result = await handlers.collectListing(data); const result = await handlers.collectListing(data);
return typeof result === "number" ? result : 0; const count = typeof result === "number" ? result : 0;
logger.info(
{
sourceId: data.source_id,
env: data.env,
queuedArticles: count,
},
"Listing collection completed",
);
return count;
}; };
export const collectArticle = async (payload: unknown): Promise<unknown> => { export const collectArticle = async (payload: unknown): Promise<unknown> => {
const data = ArticleTaskPayloadSchema.parse(payload); const data = ArticleTaskPayloadSchema.parse(payload);
return handlers.collectArticle(data); logger.debug(
{
sourceId: data.source_id,
env: data.env,
url: data.url,
page: data.page,
},
"Collecting article",
);
const result = await handlers.collectArticle(data);
logger.info(
{
sourceId: data.source_id,
env: data.env,
url: data.url,
},
"Article collection completed",
);
return result;
}; };
export const forwardForProcessing = async ( export const forwardForProcessing = async (
payload: unknown, payload: unknown,
): Promise<unknown> => { ): Promise<unknown> => {
const data = ProcessedTaskPayloadSchema.parse(payload); const data = ProcessedTaskPayloadSchema.parse(payload);
return handlers.forwardForProcessing(data); logger.debug(
{
sourceId: data.source_id,
env: data.env,
},
"Forwarding article for processing",
);
const result = await handlers.forwardForProcessing(data);
logger.info(
{
sourceId: data.source_id,
env: data.env,
},
"Article forwarded for processing",
);
return result;
}; };
+6 -5
View File
@@ -2,11 +2,12 @@
"extends": "@basango/tsconfig/base.json", "extends": "@basango/tsconfig/base.json",
"compilerOptions": { "compilerOptions": {
"rootDir": "src", "rootDir": "src",
"outDir": "dist" "outDir": "dist",
"paths": {
"@basango/crawler": ["./src/index.ts"],
"@basango/crawler/*": ["./src/*"]
}
}, },
"include": ["src"], "include": ["src"],
"references": [], "references": []
"paths": {
"@basango/crawler": ["src/**"]
}
} }