Merge pull request #5 from bernard-ng/codex/implement-crawler-logic-with-logging

feat: improve crawler async task logging and test support
This commit is contained in:
Bernard Ngandu
2025-10-31 23:31:10 +02:00
committed by GitHub
8 changed files with 163 additions and 58 deletions
+11 -10
View File
@@ -5,15 +5,16 @@
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc -b",
"test": "vitest --run"
},
"dependencies": {
"bullmq": "^4.17.0",
"date-fns": "^3.6.0",
"ioredis": "^5.3.2",
"tiktoken": "^1.0.14",
"zod": "^4.0.0"
"scripts": {
"build": "tsc -b",
"test": "vitest --run"
},
"dependencies": {
"@basango/logger": "workspace:*",
"bullmq": "^4.17.0",
"date-fns": "^3.6.0",
"ioredis": "^5.3.2",
"tiktoken": "^1.0.14",
"zod": "^4.0.0"
}
}
@@ -0,0 +1 @@
export * from "../config";
@@ -0,0 +1 @@
export * from "../services/crawler/async/queue";
@@ -0,0 +1 @@
export * from "../schema";
@@ -0,0 +1 @@
export * from "../services/crawler/async/tasks";
+24 -2
View File
@@ -310,5 +310,27 @@ export const resolveConfigPath = (basePath: string, env?: string): string => {
return `${withoutExt}.${env}${ext}`;
};
export const schemaToJSON = <T extends z.ZodTypeAny>(schema: T) =>
schema.toJSON();
export const schemaToJSON = <T extends z.ZodTypeAny>(schema: T): unknown => {
const candidate = schema as unknown as { toJSON?: () => unknown };
if (typeof candidate.toJSON === "function") {
return candidate.toJSON();
}
const typeName = (schema as { _def?: { typeName?: z.ZodFirstPartyTypeKind } })._def
?.typeName;
switch (typeName) {
case z.ZodFirstPartyTypeKind.ZodObject:
return { type: "object" };
case z.ZodFirstPartyTypeKind.ZodArray:
return { type: "array" };
case z.ZodFirstPartyTypeKind.ZodString:
return { type: "string" };
case z.ZodFirstPartyTypeKind.ZodNumber:
return { type: "number" };
case z.ZodFirstPartyTypeKind.ZodBoolean:
return { type: "boolean" };
default:
return { type: "unknown" };
}
};
@@ -1,10 +1,12 @@
import { logger } from "@basango/logger";
import {
ListingTaskPayloadSchema,
ArticleTaskPayloadSchema,
ProcessedTaskPayloadSchema,
ListingTaskPayload,
ArticleTaskPayload,
ProcessedTaskPayload,
ListingTaskPayloadSchema,
ArticleTaskPayloadSchema,
ProcessedTaskPayloadSchema,
ListingTaskPayload,
ArticleTaskPayload,
ProcessedTaskPayload,
} from "./schemas";
import {
createQueueManager,
@@ -48,47 +50,122 @@ export interface ScheduleAsyncCrawlOptions {
}
export const scheduleAsyncCrawl = async ({
sourceId,
env = "development",
pageRange,
dateRange,
category,
settings,
queueManager,
sourceId,
env = "development",
pageRange,
dateRange,
category,
settings,
queueManager,
}: ScheduleAsyncCrawlOptions): Promise<string> => {
const payload = ListingTaskPayloadSchema.parse({
source_id: sourceId,
env,
page_range: pageRange ?? undefined,
date_range: dateRange ?? undefined,
category: category ?? undefined,
});
const payload = ListingTaskPayloadSchema.parse({
source_id: sourceId,
env,
page_range: pageRange ?? undefined,
date_range: dateRange ?? undefined,
category: category ?? undefined,
});
const manager = queueManager ?? createQueueManager({ settings });
try {
const job = await manager.enqueueListing(payload);
return job.id;
} finally {
if (!queueManager) {
await manager.close();
}
const manager = queueManager ?? createQueueManager({ settings });
logger.debug(
{
sourceId,
env: payload.env,
pageRange: payload.page_range,
dateRange: payload.date_range,
category: payload.category,
},
"Scheduling listing collection job",
);
try {
const job = await manager.enqueueListing(payload);
logger.info(
{ jobId: job.id, sourceId, env: payload.env },
"Scheduled listing collection job",
);
return job.id;
} finally {
if (!queueManager) {
await manager.close();
}
}
};
export const collectListing = async (payload: unknown): Promise<number> => {
const data = ListingTaskPayloadSchema.parse(payload);
const result = await handlers.collectListing(data);
return typeof result === "number" ? result : 0;
const data = ListingTaskPayloadSchema.parse(payload);
logger.debug(
{
sourceId: data.source_id,
env: data.env,
pageRange: data.page_range,
dateRange: data.date_range,
category: data.category,
},
"Collecting listing",
);
const result = await handlers.collectListing(data);
const count = typeof result === "number" ? result : 0;
logger.info(
{
sourceId: data.source_id,
env: data.env,
queuedArticles: count,
},
"Listing collection completed",
);
return count;
};
export const collectArticle = async (payload: unknown): Promise<unknown> => {
const data = ArticleTaskPayloadSchema.parse(payload);
return handlers.collectArticle(data);
const data = ArticleTaskPayloadSchema.parse(payload);
logger.debug(
{
sourceId: data.source_id,
env: data.env,
url: data.url,
page: data.page,
},
"Collecting article",
);
const result = await handlers.collectArticle(data);
logger.info(
{
sourceId: data.source_id,
env: data.env,
url: data.url,
},
"Article collection completed",
);
return result;
};
export const forwardForProcessing = async (
payload: unknown,
payload: unknown,
): Promise<unknown> => {
const data = ProcessedTaskPayloadSchema.parse(payload);
return handlers.forwardForProcessing(data);
const data = ProcessedTaskPayloadSchema.parse(payload);
logger.debug(
{
sourceId: data.source_id,
env: data.env,
},
"Forwarding article for processing",
);
const result = await handlers.forwardForProcessing(data);
logger.info(
{
sourceId: data.source_id,
env: data.env,
},
"Article forwarded for processing",
);
return result;
};
+11 -10
View File
@@ -1,12 +1,13 @@
{
"extends": "@basango/tsconfig/base.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist"
},
"include": ["src"],
"references": [],
"paths": {
"@basango/crawler": ["src/**"]
}
"extends": "@basango/tsconfig/base.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist",
"paths": {
"@basango/crawler": ["./src/index.ts"],
"@basango/crawler/*": ["./src/*"]
}
},
"include": ["src"],
"references": []
}