feat(domain): centralize data definition

This commit is contained in:
2025-11-17 00:04:27 +02:00
parent e7585aa76c
commit f39635e04f
96 changed files with 3474 additions and 1167 deletions
+5 -6
View File
@@ -1,15 +1,14 @@
import path from "node:path";
import { loadConfig as defineConfig } from "@devscast/config";
import { z } from "zod";
import {
DateRangeSchema,
HtmlSourceConfigSchema,
PageRangeSchema,
TimestampRangeSchema,
UpdateDirectionSchema,
WordPressSourceConfigSchema,
} from "#crawler/schema";
} from "@basango/domain/crawler";
import { loadConfig as defineConfig } from "@devscast/config";
import { z } from "zod";
export const PROJECT_DIR = path.resolve(__dirname, "../");
@@ -43,7 +42,7 @@ export const PipelineConfigSchema = z.object({
}),
crawler: z.object({
category: z.string().optional(),
dateRange: DateRangeSchema.optional(),
dateRange: TimestampRangeSchema.optional(),
direction: UpdateDirectionSchema.default("forward"),
isUpdate: z.boolean().default(false),
maxWorkers: z.number().int().positive().default(5),
-29
View File
@@ -1,29 +0,0 @@
/**
* Default date format used for parsing and formatting dates.
* Follows the "yyyy-LL-dd" pattern (e.g., "2024-06-15").
*/
export const DEFAULT_DATE_FORMAT = "yyyy-LL-dd";
/**
* Default User-Agent string for HTTP requests made by the crawler.
* Some websites may block requests with missing or generic User-Agent headers.
*/
export const DEFAULT_USER_AGENT = "Basango/0.1 (+https://github.com/bernard-ng/basango)";
/**
* User-Agent string used for Open Graph requests.
* Some services require a specific User-Agent to return Open Graph data.
*/
export const OPEN_GRAPH_USER_AGENT = "facebookexternalhit/1.1";
/**
* HTTP status codes considered transient errors.
* Used for retry logic in HTTP clients.
*/
export const TRANSIENT_HTTP_STATUSES = [429, 500, 502, 503, 504];
/**
* Default header name for Retry-After responses.
* Used when handling rate limiting.
*/
export const DEFAULT_RETRY_AFTER_HEADER = "retry-after";
+5 -4
View File
@@ -1,11 +1,12 @@
import { setTimeout as delay } from "node:timers/promises";
import { FetchClientConfig } from "#crawler/config";
import {
DEFAULT_RETRY_AFTER_HEADER,
DEFAULT_TRANSIENT_HTTP_STATUSES,
DEFAULT_USER_AGENT,
TRANSIENT_HTTP_STATUSES,
} from "#crawler/constants";
} from "@basango/domain/constants";
import { FetchClientConfig } from "#crawler/config";
import { UserAgents } from "#crawler/http/user-agent";
export type HttpHeaders = Record<string, string>;
@@ -187,7 +188,7 @@ export class SyncHttpClient extends BaseHttpClient {
const response = await this.fetchImpl(target, init);
if (
TRANSIENT_HTTP_STATUSES.includes(response.status as number) &&
DEFAULT_TRANSIENT_HTTP_STATUSES.includes(response.status as number) &&
attempt < this.config.maxRetries
) {
await this.maybeDelay(attempt, response, retryAfterHeader);
+3 -3
View File
@@ -1,10 +1,10 @@
import { DEFAULT_OPEN_GRAPH_USER_AGENT } from "@basango/domain/constants";
import { ArticleMetadata } from "@basango/domain/models";
import { parse } from "node-html-parser";
import { config } from "#crawler/config";
import { OPEN_GRAPH_USER_AGENT } from "#crawler/constants";
import { SyncHttpClient } from "#crawler/http/http-client";
import { UserAgents } from "#crawler/http/user-agent";
import { ArticleMetadata } from "#crawler/schema";
import { createAbsoluteUrl } from "#crawler/utils";
/**
@@ -45,7 +45,7 @@ export class OpenGraph {
constructor() {
const settings = config.fetch.client;
const provider = new UserAgents(true, OPEN_GRAPH_USER_AGENT);
const provider = new UserAgents(true, DEFAULT_OPEN_GRAPH_USER_AGENT);
this.client = new SyncHttpClient(settings, {
defaultHeaders: { "User-Agent": provider.og() },
+2 -2
View File
@@ -1,4 +1,4 @@
import { DEFAULT_USER_AGENT, OPEN_GRAPH_USER_AGENT } from "#crawler/constants";
import { DEFAULT_OPEN_GRAPH_USER_AGENT, DEFAULT_USER_AGENT } from "@basango/domain/constants";
/**
* User agent provider with optional rotation.
@@ -30,7 +30,7 @@ export class UserAgents {
}
og(): string {
return OPEN_GRAPH_USER_AGENT;
return DEFAULT_OPEN_GRAPH_USER_AGENT;
}
get(): string {
+9 -21
View File
@@ -1,8 +1,8 @@
import type { HtmlSourceConfig, WordPressSourceConfig } from "@basango/domain/crawler";
import { Article } from "@basango/domain/models";
import { logger } from "@basango/logger";
import { config, env } from "#crawler/config";
import { UnsupportedSourceKindError } from "#crawler/errors";
import { SyncHttpClient } from "#crawler/http/http-client";
import { QueueManager, createQueueManager } from "#crawler/process/async/queue";
import {
DetailsTaskPayload,
@@ -12,11 +12,11 @@ import {
import { createPersistors, resolveCrawlerConfig } from "#crawler/process/crawler";
import { HtmlCrawler } from "#crawler/process/parsers/html";
import { WordPressCrawler } from "#crawler/process/parsers/wordpress";
import { Article, HtmlSourceConfig, WordPressSourceConfig } from "#crawler/schema";
import { forward } from "#crawler/process/persistence";
import {
createDateRange,
formatDateRange,
createTimestampRange,
formatPageRange,
formatTimestampRange,
resolveSourceConfig,
} from "#crawler/utils";
@@ -45,7 +45,7 @@ export const collectHtmlListing = async (
await manager.enqueueArticle({
category: payload.category,
dateRange: createDateRange(payload.dateRange),
dateRange: createTimestampRange(payload.dateRange),
sourceId: payload.sourceId,
url,
} as DetailsTaskPayload);
@@ -85,7 +85,7 @@ export const collectWordPressListing = async (
await manager.enqueueArticle({
category: payload.category,
data,
dateRange: createDateRange(payload.dateRange),
dateRange: createTimestampRange(payload.dateRange),
sourceId: payload.sourceId,
url,
} as DetailsTaskPayload);
@@ -106,7 +106,7 @@ export const collectArticle = async (
const source = resolveSourceConfig(payload.sourceId);
const settings = resolveCrawlerConfig(source, {
category: payload.category,
dateRange: payload.dateRange ? formatDateRange(payload.dateRange) : undefined,
dateRange: payload.dateRange ? formatTimestampRange(payload.dateRange) : undefined,
pageRange: payload.pageRange ? formatPageRange(payload.pageRange) : undefined,
sourceId: payload.sourceId,
});
@@ -141,19 +141,7 @@ export const forwardForProcessing = async (payload: ProcessingTaskPayload): Prom
try {
logger.info({ article: payload.article.title }, "Forwarding article to API");
const client = new SyncHttpClient(config.fetch.client);
const response = await client.post(env("BASANGO_CRAWLER_BACKEND_API_ENDPOINT"), {
headers: {
Authorization: `${env("BASANGO_CRAWLER_TOKEN")}`,
},
json: payload.article,
});
if (response.ok) {
const data = await response.json();
logger.info({ ...data }, "Article successfully forwarded to API");
}
await forward(payload.article);
} catch (error) {
logger.error({ error }, "Failed to forward article to API");
}
+4 -4
View File
@@ -1,7 +1,7 @@
import { PageRangeSchema, TimestampRangeSchema } from "@basango/domain/crawler";
import { articleSchema } from "@basango/domain/models";
import { z } from "zod";
import { ArticleSchema, DateRangeSchema, PageRangeSchema } from "#crawler/schema";
export const ListingTaskPayloadSchema = z.object({
category: z.string().optional(),
dateRange: z.string().optional(),
@@ -12,7 +12,7 @@ export const ListingTaskPayloadSchema = z.object({
export const DetailsTaskPayloadSchema = z.object({
category: z.string().optional(),
data: z.any().optional(),
dateRange: DateRangeSchema.optional(),
dateRange: TimestampRangeSchema.optional(),
page: z.number().int().nonnegative().optional(),
pageRange: PageRangeSchema.optional(),
sourceId: z.string(),
@@ -20,7 +20,7 @@ export const DetailsTaskPayloadSchema = z.object({
});
export const ProcessingTaskPayloadSchema = z.object({
article: ArticleSchema,
article: articleSchema,
sourceId: z.string(),
});
+3 -3
View File
@@ -1,9 +1,9 @@
import type { AnySourceConfig } from "@basango/domain/crawler";
import logger from "@basango/logger";
import { FetchCrawlerConfig, config } from "#crawler/config";
import { JsonlPersistor, Persistor } from "#crawler/process/persistence";
import { AnySourceConfig } from "#crawler/schema";
import { createDateRange, createPageRange } from "#crawler/utils";
import { createPageRange, createTimestampRange } from "#crawler/utils";
export interface CrawlingOptions {
sourceId: string;
@@ -19,7 +19,7 @@ export const resolveCrawlerConfig = (
return {
...config.fetch.crawler,
category: options.category,
dateRange: createDateRange(options.dateRange),
dateRange: createTimestampRange(options.dateRange),
pageRange: createPageRange(options.pageRange),
source,
};
+6 -2
View File
@@ -1,10 +1,11 @@
import type { AnySourceConfig } from "@basango/domain/crawler";
import { Article } from "@basango/domain/models";
import { HTMLElement, parse as parseHtml } from "node-html-parser";
import { FetchCrawlerConfig, config } from "#crawler/config";
import { SyncHttpClient } from "#crawler/http/http-client";
import { OpenGraph } from "#crawler/http/open-graph";
import type { Persistor } from "#crawler/process/persistence";
import { AnySourceConfig, Article } from "#crawler/schema";
export interface CrawlerOptions {
persistors?: Persistor[];
@@ -97,7 +98,10 @@ export abstract class BaseCrawler {
* @param record - The article record
* @param url - The URL to fetch Open Graph data from
*/
protected async enrichWithOpenGraph(record: Article, url?: string): Promise<Article> {
protected async enrichWithOpenGraph(
record: Partial<Article>,
url?: string,
): Promise<Partial<Article>> {
try {
const metadata = url ? await this.openGraph.consumeUrl(url) : undefined;
return { ...record, metadata };
+3 -2
View File
@@ -1,3 +1,5 @@
import type { HtmlSourceConfig, TimestampRange } from "@basango/domain/crawler";
import { Article } from "@basango/domain/models";
import { logger } from "@basango/logger";
import { fromUnixTime, getUnixTime, isMatch as isDateMatch, parse } from "date-fns";
import { HTMLElement } from "node-html-parser";
@@ -12,7 +14,6 @@ import {
} from "#crawler/errors";
import { BaseCrawler } from "#crawler/process/parsers/base";
import { Persistor, persist } from "#crawler/process/persistence";
import { Article, DateRange, HtmlSourceConfig } from "#crawler/schema";
import { createAbsoluteUrl, isTimestampInRange } from "#crawler/utils";
const md = new TurndownService({
@@ -106,7 +107,7 @@ export class HtmlCrawler extends BaseCrawler {
* @param html - The HTML content of the article
* @param dateRange - Optional date range for filtering
*/
async fetchOne(html: string, dateRange?: DateRange | null): Promise<Article> {
async fetchOne(html: string, dateRange?: TimestampRange | null): Promise<Partial<Article>> {
const root = this.parseHtml(html);
const selectors = this.source.sourceSelectors;
@@ -1,3 +1,5 @@
import type { PageRange, TimestampRange, WordPressSourceConfig } from "@basango/domain/crawler";
import { Article } from "@basango/domain/models";
import { logger } from "@basango/logger";
import { fromUnixTime } from "date-fns";
import TurndownService from "turndown";
@@ -10,7 +12,6 @@ import {
} from "#crawler/errors";
import { BaseCrawler } from "#crawler/process/parsers/base";
import { Persistor, persist } from "#crawler/process/persistence";
import { Article, DateRange, PageRange, WordPressSourceConfig } from "#crawler/schema";
import { isTimestampInRange } from "#crawler/utils";
const md = new TurndownService({
@@ -107,7 +108,7 @@ export class WordPressCrawler extends BaseCrawler {
* @param input - Decoded JSON object or raw JSON string
* @param dateRange - Optional date range for filtering
*/
async fetchOne(input: unknown, dateRange?: DateRange | null): Promise<Article> {
async fetchOne(input: unknown, dateRange?: TimestampRange | null): Promise<Article> {
// input can be the decoded JSON object or a raw JSON string
let data: WordPressPost | null = null;
try {
+46 -10
View File
@@ -1,13 +1,15 @@
import fs from "node:fs";
import path from "node:path";
import type { Article } from "@basango/domain/models";
import { md5 } from "@basango/encryption";
import logger from "@basango/logger";
import { Article } from "#crawler/schema";
import { config, env } from "#crawler/config";
import { HttpError, SyncHttpClient } from "#crawler/http/http-client";
export interface Persistor {
persist(record: Article): Promise<void> | void;
persist(record: Partial<Article>): Promise<void> | void;
close: () => Promise<void> | void;
}
@@ -35,17 +37,20 @@ const sanitize = (text: string): string => {
return s.trim();
};
export const persist = async (payload: Article, persistors: Persistor[]): Promise<Article> => {
export const persist = async (
payload: Partial<Article>,
persistors: Persistor[],
): Promise<Article> => {
const data = {
...payload,
body: sanitize(payload.body),
categories: payload.categories.map(sanitize),
title: sanitize(payload.title),
body: sanitize(payload.body!),
categories: payload.categories!.map(sanitize),
title: sanitize(payload.title!),
};
const article = {
...data,
hash: md5(data.link),
hash: md5(data.link!),
} as Article;
for (const persistor of persistors) {
@@ -60,6 +65,37 @@ export const persist = async (payload: Article, persistors: Persistor[]): Promis
return article;
};
export const forward = async (payload: Partial<Article>): Promise<void> => {
const client = new SyncHttpClient(config.fetch.client);
const endpoint = env("BASANGO_CRAWLER_BACKEND_API_ENDPOINT");
const token = env("BASANGO_CRAWLER_TOKEN");
try {
const response = await client.post(endpoint, {
headers: {
Authorization: `${token}`,
},
json: payload,
});
if (response.ok) {
const data = await response.json();
logger.info({ ...data }, "Article forwarded");
return;
}
logger.error({ status: response.status, url: payload.link }, "Forwarding failed");
} catch (error) {
if (error instanceof HttpError) {
const data = await error.response.json();
logger.error({ ...data, url: payload.link }, "Error forwarding article");
return;
}
logger.error({ error, url: payload.link }, "Error forwarding article");
}
};
export class JsonlPersistor implements Persistor {
private readonly filePath: string;
private readonly encoding: BufferEncoding;
@@ -78,15 +114,15 @@ export class JsonlPersistor implements Persistor {
}
}
persist(record: Article): Promise<void> {
persist(payload: Partial<Article>): Promise<void> {
if (this.closed) {
return Promise.reject(new Error("Persistor has been closed"));
}
const payload = `${JSON.stringify(record)}\n`;
const record = `${JSON.stringify(payload)}\n`;
this.pending = this.pending.then(async () => {
fs.appendFileSync(this.filePath, payload, { encoding: this.encoding });
fs.appendFileSync(this.filePath, record, { encoding: this.encoding });
});
return this.pending;
-130
View File
@@ -1,130 +0,0 @@
import { z } from "zod";
export const UpdateDirectionSchema = z.enum(["forward", "backward"]);
export const SourceKindSchema = z.enum(["wordpress", "html"]);
export const DateRangeSchema = z
.object({
end: z.number().int(),
start: z.number().int(),
})
.superRefine((value, ctx) => {
if (value.start === 0 || value.end === 0) {
ctx.addIssue({
code: "custom",
message: "Timestamp cannot be zero",
});
}
if (value.end < value.start) {
ctx.addIssue({
code: "custom",
message: "End timestamp must be greater than or equal to start",
});
}
});
export const PageRangeSchema = z
.object({
end: z.number().int().min(0),
start: z.number().int().min(0),
})
.superRefine((value, ctx) => {
if (value.end < value.start) {
ctx.addIssue({
code: "custom",
message: "End page must be greater than or equal to start page",
});
}
});
export const PageRangeSpecSchema = z
.string()
.regex(/^[0-9]+:[0-9]+$/, "Invalid page range format. Use start:end")
.transform((spec) => {
const [startText, endText] = spec.split(":");
return {
end: Number.parseInt(String(endText), 10),
start: Number.parseInt(String(startText), 10),
};
});
export const DateRangeSpecSchema = z
.string()
.regex(/.+:.+/, "Expected start:end format")
.transform((spec) => {
const [startRaw, endRaw] = spec.split(":");
return { endRaw: String(endRaw), startRaw: String(startRaw) };
});
export const SourceDateSchema = z.object({
format: z.string().default("yyyy-LL-dd HH:mm"),
});
const BaseSourceSchema = z.object({
categories: z.array(z.string()).default([]),
requiresDetails: z.boolean().default(false),
requiresRateLimit: z.boolean().default(false),
sourceDate: SourceDateSchema,
sourceId: z.string(),
sourceKind: SourceKindSchema,
sourceUrl: z.url(),
supportsCategories: z.boolean().default(false),
});
export const HtmlSourceConfigSchema = BaseSourceSchema.extend({
paginationTemplate: z.string(),
sourceKind: z.literal("html"),
sourceSelectors: z.object({
articleBody: z.string(),
articleCategories: z.string().optional(),
articleDate: z.string(),
articleLink: z.string(),
articles: z.string(),
articleTitle: z.string(),
pagination: z.string().default("ul.pagination > li a"),
}),
});
export const WordPressSourceConfigSchema = BaseSourceSchema.extend({
sourceDate: SourceDateSchema.default(SourceDateSchema.parse({ format: "yyyy-LL-dd'T'HH:mm:ss" })),
sourceKind: z.literal("wordpress"),
});
export const ArticleMetadataSchema = z.object({
description: z.string().optional(),
image: z.string().optional(),
title: z.string().optional(),
url: z.url().optional(),
});
export const ArticleTokenStatisticsSchema = z.object({
body: z.number().int().nonnegative().default(0),
categories: z.number().int().nonnegative().default(0),
excerpt: z.number().int().nonnegative().default(0),
title: z.number().int().nonnegative().default(0),
});
export const ArticleSchema = z.object({
body: z.string(),
categories: z.array(z.string()).default([]),
hash: z.string().optional(),
link: z.url(),
metadata: ArticleMetadataSchema.optional(),
publishedAt: z.date(),
sourceId: z.string(),
title: z.string(),
tokenStatistics: ArticleTokenStatisticsSchema.optional(),
});
export type ArticleMetadata = z.infer<typeof ArticleMetadataSchema>;
export type Article = z.infer<typeof ArticleSchema>;
export type DateRange = z.infer<typeof DateRangeSchema>;
export type PageRange = z.infer<typeof PageRangeSchema>;
export type HtmlSourceConfig = z.infer<typeof HtmlSourceConfigSchema>;
export type WordPressSourceConfig = z.infer<typeof WordPressSourceConfigSchema>;
export type AnySourceConfig = HtmlSourceConfig | WordPressSourceConfig;
export interface CreateDateRangeOptions {
format?: string;
separator?: string;
}
@@ -1,3 +1,5 @@
#! /usr/bin/env bun
import { logger } from "@basango/logger";
import { scheduleAsyncCrawl } from "#crawler/process/async/tasks";
-23
View File
@@ -1,23 +0,0 @@
import { logger } from "@basango/logger";
import { runSyncCrawl } from "#crawler/process/sync/tasks";
import { CRAWLING_USAGE, parseCrawlingCliArgs } from "#crawler/scripts/utils";
const main = async (): Promise<void> => {
const options = parseCrawlingCliArgs();
if (options.sourceId === undefined) {
console.log(CRAWLING_USAGE);
process.exitCode = 1;
return;
}
try {
await runSyncCrawl({ ...options });
} catch (error) {
logger.error({ error }, "Synchronous crawl failed");
process.exitCode = 1;
}
};
void main();
+79
View File
@@ -0,0 +1,79 @@
#! /usr/bin/env bun
import fs from "node:fs";
import path from "node:path";
import { createInterface } from "node:readline";
import { parseArgs } from "node:util";
import type { Article } from "@basango/domain/models";
import { logger } from "@basango/logger";
import { config } from "#crawler/config";
import { forward } from "#crawler/process/persistence";
const USAGE = `
Usage: bun run crawler:sync -- --sourceId <id>
`;
const parseCliArgs = (): { sourceId?: string } => {
const { values } = parseArgs({
options: {
sourceId: { type: "string" },
},
});
return values as { sourceId?: string };
};
const main = async (): Promise<void> => {
const { sourceId } = parseCliArgs();
if (!sourceId) {
console.log(USAGE);
process.exitCode = 1;
return;
}
const filePath = path.join(config.paths.data, `${sourceId}.jsonl`);
if (!fs.existsSync(filePath)) {
logger.error({ filePath, sourceId }, "Source must be crawled first; JSONL not found");
process.exitCode = 1;
return;
}
const stat = fs.statSync(filePath);
if (stat.size === 0) {
logger.error({ filePath, sourceId }, "Source must be crawled first; JSONL is empty");
process.exitCode = 1;
return;
}
logger.info({ filePath, sourceId }, "Syncing articles from JSONL to backend");
const stream = fs.createReadStream(filePath, { encoding: "utf-8" });
const rl = createInterface({ crlfDelay: Infinity, input: stream });
let count = 0;
try {
for await (const raw of rl) {
const line = raw.trim();
if (!line) continue;
try {
const article = JSON.parse(line) as Article & { publishedAt: string };
await forward({
...article,
publishedAt: new Date(article.publishedAt),
});
count += 1;
} catch (error) {
logger.error({ error, linePreview: line.slice(0, 100) }, "Invalid JSONL line");
}
}
} finally {
rl.close();
}
logger.info({ forwarded: count, sourceId }, "Sync completed");
};
void main();
+11 -95
View File
@@ -1,109 +1,25 @@
import fs from "node:fs";
import path from "node:path";
import { createInterface } from "node:readline";
import { parseArgs } from "node:util";
#! /usr/bin/env bun
import { logger } from "@basango/logger";
import { config, env } from "#crawler/config";
import { HttpError, SyncHttpClient } from "#crawler/http/http-client";
import type { Article } from "#crawler/schema";
const USAGE = `
Usage: bun run crawler:sync -- --sourceId <id>
`;
const parseCliArgs = (): { sourceId?: string } => {
const { values } = parseArgs({
options: {
sourceId: { type: "string" },
},
});
return values as { sourceId?: string };
};
const forwardArticle = async (article: Article): Promise<void> => {
const client = new SyncHttpClient(config.fetch.client);
const endpoint = env("BASANGO_CRAWLER_BACKEND_API_ENDPOINT");
const token = env("BASANGO_CRAWLER_TOKEN");
try {
const response = await client.post(endpoint, {
headers: {
Authorization: `${token}`,
},
json: article,
});
if (response.ok) {
const data = await response.json();
logger.info({ ...data }, "Article forwarded");
return;
}
logger.error({ link: article.link, status: response.status }, "Forwarding failed");
} catch (error) {
if (error instanceof HttpError) {
const data = await error.response.json();
logger.error({ ...data, link: article.link }, "Error forwarding article");
return;
}
logger.error({ error, link: article.link }, "Error forwarding article");
}
};
import { runSyncCrawl } from "#crawler/process/sync/tasks";
import { CRAWLING_USAGE, parseCrawlingCliArgs } from "#crawler/scripts/utils";
const main = async (): Promise<void> => {
const { sourceId } = parseCliArgs();
if (!sourceId) {
console.log(USAGE);
const options = parseCrawlingCliArgs();
if (options.sourceId === undefined) {
console.log(CRAWLING_USAGE);
process.exitCode = 1;
return;
}
const filePath = path.join(config.paths.data, `${sourceId}.jsonl`);
if (!fs.existsSync(filePath)) {
logger.error({ filePath, sourceId }, "Source must be crawled first; JSONL not found");
process.exitCode = 1;
return;
}
const stat = fs.statSync(filePath);
if (stat.size === 0) {
logger.error({ filePath, sourceId }, "Source must be crawled first; JSONL is empty");
process.exitCode = 1;
return;
}
logger.info({ filePath, sourceId }, "Syncing articles from JSONL to backend");
const stream = fs.createReadStream(filePath, { encoding: "utf-8" });
const rl = createInterface({ crlfDelay: Infinity, input: stream });
let count = 0;
try {
for await (const raw of rl) {
const line = raw.trim();
if (!line) continue;
try {
const article = JSON.parse(line) as Article & { publishedAt: string };
await forwardArticle({
...article,
publishedAt: new Date(article.publishedAt),
});
count += 1;
} catch (error) {
logger.error({ error, linePreview: line.slice(0, 100) }, "Invalid JSONL line");
}
}
} finally {
rl.close();
await runSyncCrawl({ ...options });
} catch (error) {
logger.error({ error }, "Synchronous crawl failed");
process.exitCode = 1;
}
logger.info({ forwarded: count, sourceId }, "Sync completed");
};
void main();
+2
View File
@@ -1,3 +1,5 @@
#! /usr/bin/env bun
import { logger } from "@basango/logger";
import { createQueueManager } from "#crawler/process/async/queue";
+23 -35
View File
@@ -1,20 +1,19 @@
import { format, getUnixTime, isMatch, parse } from "date-fns";
import type { RedisOptions } from "ioredis";
import { config } from "#crawler/config";
import { DEFAULT_DATE_FORMAT } from "#crawler/constants";
import { DEFAULT_DATE_FORMAT } from "@basango/domain/constants";
import {
AnySourceConfig,
CreateDateRangeOptions,
DateRange,
DateRangeSchema,
DateRangeSpecSchema,
DateSpecSchema,
HtmlSourceConfig,
PageRange,
PageRangeSchema,
PageRangeSpecSchema,
PageSpecSchema,
TimestampRange,
TimestampRangeSchema,
WordPressSourceConfig,
} from "#crawler/schema";
} from "@basango/domain/crawler";
import { format, fromUnixTime, getUnixTime, isMatch, parse } from "date-fns";
import type { RedisOptions } from "ioredis";
import { config } from "#crawler/config";
/**
* Resolve a source configuration by its ID.
@@ -71,7 +70,7 @@ const parseDate = (value: string, format: string): Date => {
*/
export const createPageRange = (spec: string | undefined): PageRange | undefined => {
if (!spec) return undefined;
const parsed = PageRangeSpecSchema.parse(spec);
const parsed = PageSpecSchema.parse(spec);
return PageRangeSchema.parse(parsed);
};
@@ -80,10 +79,13 @@ export const createPageRange = (spec: string | undefined): PageRange | undefined
* @param spec - The date range specification (e.g., "2023-01-01:2023-12-31")
* @param options - Options for date range creation
*/
export const createDateRange = (
export const createTimestampRange = (
spec: string | undefined,
options: CreateDateRangeOptions = {},
): DateRange | undefined => {
options: {
format?: string;
separator?: string;
} = {},
): TimestampRange | undefined => {
if (!spec) return undefined;
const { format = DEFAULT_DATE_FORMAT, separator = ":" } = options;
if (!separator) {
@@ -91,7 +93,7 @@ export const createDateRange = (
}
const normalized = spec.replace(separator, ":");
const parsedSpec = DateRangeSpecSchema.parse(normalized);
const parsedSpec = DateSpecSchema.parse(normalized);
const startDate = parseDate(parsedSpec.startRaw, format);
const endDate = parseDate(parsedSpec.endRaw, format);
@@ -101,7 +103,7 @@ export const createDateRange = (
start: getUnixTime(startDate),
};
return DateRangeSchema.parse(range);
return TimestampRangeSchema.parse(range);
};
/**
@@ -109,9 +111,9 @@ export const createDateRange = (
* @param range - The date range
* @param fmt - The date format (default: DEFAULT_DATE_FORMAT)
*/
export const formatDateRange = (range: DateRange, fmt = DEFAULT_DATE_FORMAT): string => {
const start = format(new Date(range.start * 1000), fmt);
const end = format(new Date(range.end * 1000), fmt);
export const formatTimestampRange = (range: TimestampRange, fmt = DEFAULT_DATE_FORMAT): string => {
const start = format(fromUnixTime(range.start), fmt);
const end = format(fromUnixTime(range.end), fmt);
return `${start}:${end}`;
};
@@ -128,7 +130,7 @@ export const formatPageRange = (range: PageRange): string => {
* @param range - The date range
* @param timestamp - The timestamp to check
*/
export const isTimestampInRange = (range: DateRange, timestamp: number): boolean => {
export const isTimestampInRange = (range: TimestampRange, timestamp: number): boolean => {
return range.start <= timestamp && timestamp <= range.end;
};
@@ -145,17 +147,3 @@ export const createAbsoluteUrl = (base: string, href: string): string => {
return href;
}
};
/**
* extract the domain name from a URL.
* @param url - The URL string
* @returns The domain name or null if invalid URL
*/
export const extractDomainName = (url: string): string | null => {
try {
const parsed = new URL(url);
return parsed.hostname;
} catch {
return null;
}
};