feat(crawler): implement http client and persistence

This commit is contained in:
Bernard Ngandu
2025-11-02 21:55:53 +02:00
parent c53c0b576b
commit e8b20f6612
11 changed files with 816 additions and 5 deletions
+1
View File
@@ -15,6 +15,7 @@
"bullmq": "^4.17.0", "bullmq": "^4.17.0",
"date-fns": "catalog:", "date-fns": "catalog:",
"ioredis": "^5.3.2", "ioredis": "^5.3.2",
"node-html-parser": "^6.1.10",
"tiktoken": "^1.0.14", "tiktoken": "^1.0.14",
"zod": "catalog:" "zod": "catalog:"
} }
@@ -0,0 +1,91 @@
import { describe, expect, it, beforeEach, vi } from "vitest";
import { PipelineConfigManager } from "@/config";
import { registerCrawler, clearCrawlerRegistry, runSyncCrawl } from "@/process/crawler";
import { PipelineConfigSchema, SourceKindSchema } from "@/schema";
const createPipeline = () =>
PipelineConfigSchema.parse({
paths: {
root: ".",
data: ".",
logs: ".",
configs: ".",
},
sources: {
html: [
{
source_id: "demo",
source_url: "https://example.com",
source_kind: SourceKindSchema.enum.html,
pagination_template: "/page/{page}",
},
],
wordpress: [],
},
});
describe("runSyncCrawl", () => {
beforeEach(() => {
clearCrawlerRegistry();
});
it("invokes registered crawler factory", async () => {
const pipeline = createPipeline();
const fetch = vi.fn().mockResolvedValue(undefined);
const close = vi.fn();
registerCrawler(SourceKindSchema.enum.html, () => ({ fetch, close }));
const manager = {
get: vi.fn().mockReturnValue(pipeline),
setupLogging: vi.fn(),
} as unknown as PipelineConfigManager;
const persistClose = vi.fn();
const persistFactory = vi.fn().mockReturnValue([
{ persist: vi.fn(), close: persistClose },
]);
await runSyncCrawl({
sourceId: "demo",
env: "test",
manager,
persistFactory,
});
expect(fetch).toHaveBeenCalledTimes(1);
expect(close).toHaveBeenCalledTimes(1);
expect(persistFactory).toHaveBeenCalledWith({
pipeline,
source: pipeline.sources.html[0],
resolvedSourceId: "demo",
});
expect(persistClose).toHaveBeenCalledTimes(1);
});
it("throws when source is missing", async () => {
const pipeline = createPipeline();
registerCrawler(SourceKindSchema.enum.html, () => ({ fetch: vi.fn() }));
const manager = {
get: vi.fn().mockReturnValue(pipeline),
setupLogging: vi.fn(),
} as unknown as PipelineConfigManager;
await expect(
runSyncCrawl({ sourceId: "unknown", manager }),
).rejects.toThrow("Source 'unknown' not found");
});
it("throws when no crawler registered", async () => {
const pipeline = createPipeline();
const manager = {
get: vi.fn().mockReturnValue(pipeline),
setupLogging: vi.fn(),
} as unknown as PipelineConfigManager;
await expect(
runSyncCrawl({ sourceId: "demo", manager }),
).rejects.toThrow("No crawler registered");
});
});
@@ -0,0 +1,83 @@
import { describe, expect, it, vi } from "vitest";
import { ClientConfigSchema } from "@/schema";
import { HttpError, SyncHttpClient } from "@/http/http-client";
const createConfig = () =>
ClientConfigSchema.parse({
timeout: 1,
max_retries: 2,
backoff_initial: 0.001,
backoff_multiplier: 2,
backoff_max: 0.01,
});
describe("SyncHttpClient", () => {
it("retries transient statuses", async () => {
const config = createConfig();
const sleep = vi.fn().mockResolvedValue(undefined);
const fetchMock = vi
.fn()
.mockResolvedValueOnce(new Response("retry", { status: 503 }))
.mockResolvedValueOnce(new Response("ok", { status: 200, body: "done" }));
const client = new SyncHttpClient(config, { fetchImpl: fetchMock, sleep });
const response = await client.get("https://example.com");
expect(await response.text()).toBe("done");
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(sleep).toHaveBeenCalled();
});
it("respects retry-after header", async () => {
const config = createConfig();
const sleep = vi.fn().mockResolvedValue(undefined);
const fetchMock = vi
.fn()
.mockResolvedValueOnce(
new Response("retry", { status: 503, headers: { "Retry-After": "3" } }),
)
.mockResolvedValueOnce(new Response("ok", { status: 200 }));
const client = new SyncHttpClient(config, { fetchImpl: fetchMock, sleep });
await client.get("https://example.com");
expect(sleep).toHaveBeenCalledWith(3000);
});
it("throws http error on non transient failure", async () => {
const config = createConfig();
const fetchMock = vi
.fn()
.mockResolvedValueOnce(new Response("bad", { status: 404, statusText: "Not Found" }));
const client = new SyncHttpClient(config, { fetchImpl: fetchMock });
await expect(client.get("https://example.com"))
.rejects.toBeInstanceOf(HttpError);
});
it("sends json payload and query params", async () => {
const config = createConfig();
const fetchMock = vi
.fn()
.mockResolvedValue(new Response("ok", { status: 200 }));
const client = new SyncHttpClient(config, { fetchImpl: fetchMock });
await client.post("https://example.com/api", {
params: { page: 1, q: "news" },
json: { hello: "world" },
headers: { Authorization: "token" },
});
expect(fetchMock).toHaveBeenCalledTimes(1);
const [url, init] = fetchMock.mock.calls[0]!;
expect(url).toBe("https://example.com/api?page=1&q=news");
expect(init?.method).toBe("POST");
expect(init?.body).toBe(JSON.stringify({ hello: "world" }));
expect((init?.headers as Record<string, string>)["Authorization"]).toBe("token");
expect((init?.headers as Record<string, string>)["Content-Type"]).toBe(
"application/json",
);
});
});
@@ -0,0 +1,49 @@
import { describe, expect, it, vi } from "vitest";
import { OpenGraphProvider } from "@/http/open-graph";
const sampleHtml = `
<!DOCTYPE html>
<html>
<head>
<title>Example Article</title>
<meta property="og:title" content="Open Graph Title" />
<meta property="og:description" content="Summary" />
<meta property="og:image" content="https://cdn.example.com/image.jpg" />
<meta property="og:url" content="https://example.com/article" />
<link rel="canonical" href="https://example.com/canonical" />
</head>
<body>
<img src="https://cdn.example.com/fallback.jpg" />
</body>
</html>
`;
describe("OpenGraphProvider", () => {
it("extracts metadata from html", () => {
const metadata = OpenGraphProvider.consumeHtml(sampleHtml, "https://example.com");
expect(metadata).toEqual({
title: "Open Graph Title",
description: "Summary",
image: "https://cdn.example.com/image.jpg",
url: "https://example.com/article",
});
});
it("falls back to null when no metadata present", () => {
const empty = OpenGraphProvider.consumeHtml("<html><body></body></html>");
expect(empty).toBeNull();
});
it("fetches metadata from url", async () => {
const response = new Response(sampleHtml, { status: 200 });
const get = vi.fn().mockResolvedValue(response);
const provider = new OpenGraphProvider({ client: { get } });
const metadata = await provider.consumeUrl("https://example.com/article");
expect(get).toHaveBeenCalledWith("https://example.com/article");
expect(metadata?.title).toBe("Open Graph Title");
});
});
@@ -0,0 +1,27 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { JsonlPersistor } from "@/persistence";
describe("JsonlPersistor", () => {
it("writes json lines sequentially", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "jsonl-test-"));
const persistor = new JsonlPersistor({ directory: tempDir, sourceId: "demo" });
await Promise.all([
persistor.persist({ id: 1, title: "first" }),
persistor.persist({ id: 2, title: "second" }),
]);
await persistor.close();
const contents = fs.readFileSync(path.join(tempDir, "demo.jsonl"), "utf-8");
const lines = contents.trim().split("\n").map((line) => JSON.parse(line));
expect(lines).toContainEqual({ id: 1, title: "first" });
expect(lines).toContainEqual({ id: 2, title: "second" });
});
});
+8
View File
@@ -5,3 +5,11 @@ export const DEFAULT_CONFIG_FILES = [
path.join(process.cwd(), "config", "pipeline.json"), path.join(process.cwd(), "config", "pipeline.json"),
path.join(process.cwd(), "pipeline.json"), path.join(process.cwd(), "pipeline.json"),
]; ];
export const DEFAULT_USER_AGENT =
"Basango/0.1 (+https://github.com/bernard-ng/basango)";
export const OPEN_GRAPH_USER_AGENT = "facebookexternalhit/1.1";
export const TRANSIENT_HTTP_STATUSES = [429, 500, 502, 503, 504] as const;
export const DEFAULT_RETRY_AFTER_HEADER = "retry-after";
@@ -0,0 +1,224 @@
import { setTimeout as delay } from "node:timers/promises";
import type { ClientConfig } from "@/schema";
import { DEFAULT_RETRY_AFTER_HEADER, DEFAULT_USER_AGENT, TRANSIENT_HTTP_STATUSES } from "@/constants";
import { UserAgents } from "@/http/user-agent";
export type HttpHeaders = Record<string, string>;
export type HttpParams = Record<string, string | number | boolean | null | undefined>;
export type HttpData = unknown;
export interface HttpClientOptions {
userAgentProvider?: UserAgents;
defaultHeaders?: HttpHeaders;
fetchImpl?: typeof fetch;
sleep?: (ms: number) => Promise<void>;
}
export interface HttpRequestOptions {
headers?: HttpHeaders;
params?: HttpParams;
data?: HttpData;
json?: HttpData;
retryAfterHeader?: string;
}
export class HttpError extends Error {
readonly status: number;
readonly response: Response;
constructor(message: string, response: Response) {
super(message);
this.status = response.status;
this.response = response;
}
}
const defaultSleep = (ms: number): Promise<void> => {
if (typeof Bun !== "undefined" && typeof Bun.sleep === "function") {
return Bun.sleep(ms);
}
return delay(ms).then(() => undefined);
};
const buildUrl = (url: string, params?: HttpParams): string => {
if (!params || Object.keys(params).length === 0) {
return url;
}
const target = new URL(url);
for (const [key, value] of Object.entries(params)) {
if (value === undefined || value === null) continue;
target.searchParams.set(key, String(value));
}
return target.toString();
};
const computeBackoff = (config: ClientConfig, attempt: number): number => {
const base = Math.min(
config.backoff_initial * Math.pow(config.backoff_multiplier, attempt),
config.backoff_max,
);
const jitter = Math.random() * base * 0.25;
return (base + jitter) * 1000;
};
const parseRetryAfter = (header: string): number => {
const numeric = Number.parseInt(header, 10);
if (!Number.isNaN(numeric)) {
return Math.max(0, numeric * 1000);
}
const parsed = Date.parse(header);
if (Number.isNaN(parsed)) {
return 0;
}
const delta = parsed - Date.now();
return delta > 0 ? delta : 0;
};
export class BaseHttpClient {
protected readonly config: ClientConfig;
protected readonly fetchImpl: typeof fetch;
protected readonly sleep: (ms: number) => Promise<void>;
protected readonly headers: HttpHeaders;
constructor(config: ClientConfig, options: HttpClientOptions = {}) {
this.config = config;
const provider =
options.userAgentProvider ??
new UserAgents(config.rotate, config.user_agent ?? DEFAULT_USER_AGENT);
const userAgent = provider.get() ?? config.user_agent ?? DEFAULT_USER_AGENT;
const baseHeaders: HttpHeaders = { "User-Agent": userAgent };
if (options.defaultHeaders) {
Object.assign(baseHeaders, options.defaultHeaders);
}
this.headers = baseHeaders;
this.fetchImpl = options.fetchImpl ?? fetch;
this.sleep = options.sleep ?? defaultSleep;
}
protected buildHeaders(headers?: HttpHeaders): HeadersInit {
return { ...this.headers, ...(headers ?? {}) };
}
protected async maybeDelay(
attempt: number,
response?: Response,
retryAfterHeader: string = DEFAULT_RETRY_AFTER_HEADER,
): Promise<void> {
let waitMs = 0;
if (response) {
const retryAfter = response.headers.get(retryAfterHeader);
if (retryAfter && this.config.respect_retry_after) {
waitMs = parseRetryAfter(retryAfter);
}
}
if (waitMs === 0) {
waitMs = computeBackoff(this.config, attempt);
}
if (waitMs > 0) {
await this.sleep(waitMs);
}
}
}
export class SyncHttpClient extends BaseHttpClient {
async request(
method: string,
url: string,
options: HttpRequestOptions = {},
): Promise<Response> {
const retryAfterHeader = options.retryAfterHeader ?? DEFAULT_RETRY_AFTER_HEADER;
const target = buildUrl(url, options.params);
const maxAttempts = this.config.max_retries + 1;
let attempt = 0;
let lastError: unknown;
while (attempt < maxAttempts) {
const controller = new AbortController();
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
try {
timeoutHandle = setTimeout(() => controller.abort(), this.config.timeout * 1000);
const headers = this.buildHeaders(options.headers);
const init: RequestInit = {
method,
headers,
body: options.data as BodyInit | undefined,
signal: controller.signal,
redirect: this.config.follow_redirects ? "follow" : "manual",
};
if (options.json !== undefined) {
init.body = JSON.stringify(options.json);
(init.headers as Record<string, string>)["Content-Type"] ??=
"application/json";
}
const response = await this.fetchImpl(target, init);
if (
TRANSIENT_HTTP_STATUSES.includes(response.status as number) &&
attempt < this.config.max_retries
) {
await this.maybeDelay(attempt, response, retryAfterHeader);
attempt += 1;
continue;
}
if (!response.ok) {
throw new HttpError(`HTTP ${response.status} ${response.statusText}`, response);
}
return response;
} catch (error) {
if (error instanceof HttpError) {
lastError = error;
throw error;
}
if (error instanceof DOMException && error.name === "AbortError") {
lastError = error;
if (attempt >= this.config.max_retries) {
throw error;
}
} else {
lastError = error;
if (attempt >= this.config.max_retries) {
throw error;
}
}
await this.maybeDelay(attempt);
attempt += 1;
} finally {
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
}
}
throw lastError instanceof Error
? lastError
: new Error("HTTP request failed after retries");
}
get(url: string, options?: Omit<HttpRequestOptions, "data" | "json">): Promise<Response> {
return this.request("GET", url, options);
}
post(url: string, options: HttpRequestOptions = {}): Promise<Response> {
return this.request("POST", url, options);
}
}
export type HttpClient = SyncHttpClient;
+112
View File
@@ -0,0 +1,112 @@
import { parse } from "node-html-parser";
import { OPEN_GRAPH_USER_AGENT } from "@/constants";
import type { ClientConfig } from "@/schema";
import { SyncHttpClient } from "@/http/http-client";
import { UserAgents } from "@/http/user-agent";
export interface OpenGraphMetadata {
title?: string | null;
description?: string | null;
image?: string | null;
url?: string | null;
}
export interface OpenGraphProviderOptions {
client?: Pick<SyncHttpClient, "get">;
clientConfig?: ClientConfig;
userAgentProvider?: UserAgents;
}
const pick = (values: Array<string | null | undefined>): string | null => {
for (const value of values) {
if (value && value.trim().length > 0) {
return value.trim();
}
}
return null;
};
const extractMeta = (root: ReturnType<typeof parse>, property: string): string | null => {
const selector = `meta[property='${property}'], meta[name='${property}']`;
const node = root.querySelector(selector);
if (!node) {
return null;
}
return node.getAttribute("content") ?? null;
};
export class OpenGraphProvider {
private readonly client: Pick<SyncHttpClient, "get">;
constructor(options: OpenGraphProviderOptions = {}) {
const provider =
options.userAgentProvider ?? new UserAgents(false, OPEN_GRAPH_USER_AGENT);
const clientConfig: ClientConfig =
options.clientConfig ?? ({
timeout: 20,
user_agent: OPEN_GRAPH_USER_AGENT,
follow_redirects: true,
verify_ssl: true,
rotate: false,
max_retries: 2,
backoff_initial: 1,
backoff_multiplier: 2,
backoff_max: 5,
respect_retry_after: true,
} satisfies ClientConfig);
this.client =
options.client ??
new SyncHttpClient(clientConfig, {
userAgentProvider: provider,
defaultHeaders: { "User-Agent": provider.og() },
});
}
async consumeUrl(url: string): Promise<OpenGraphMetadata | null> {
try {
const response = await this.client.get(url);
const html = await response.text();
return OpenGraphProvider.consumeHtml(html, url);
} catch {
return null;
}
}
static consumeHtml(html: string, url?: string): OpenGraphMetadata | null {
if (!html) {
return null;
}
const root = parse(html);
const title = pick([
extractMeta(root, "og:title"),
root.querySelector("title")?.text,
]);
const description = pick([
extractMeta(root, "og:description"),
extractMeta(root, "description"),
]);
const image = pick([
extractMeta(root, "og:image"),
root.querySelector("img")?.getAttribute("src") ?? null,
]);
const canonical = pick([
extractMeta(root, "og:url"),
root.querySelector("link[rel='canonical']")?.getAttribute("href") ?? null,
url ?? null,
]);
if (!title && !description && !image && !canonical) {
return null;
}
return {
title,
description,
image,
url: canonical,
};
}
}
+4 -5
View File
@@ -1,3 +1,5 @@
import { DEFAULT_USER_AGENT, OPEN_GRAPH_USER_AGENT } from "@/constants";
export class UserAgents { export class UserAgents {
private static readonly USER_AGENTS: string[] = [ private static readonly USER_AGENTS: string[] = [
"Mozilla/5.0 (iPhone; CPU iPhone OS 10_4_8; like Mac OS X) AppleWebKit/603.39 (KHTML, like Gecko) Chrome/52.0.3638.271 Mobile Safari/537.5", "Mozilla/5.0 (iPhone; CPU iPhone OS 10_4_8; like Mac OS X) AppleWebKit/603.39 (KHTML, like Gecko) Chrome/52.0.3638.271 Mobile Safari/537.5",
@@ -15,16 +17,13 @@ export class UserAgents {
private readonly rotate: boolean; private readonly rotate: boolean;
private readonly fallback: string; private readonly fallback: string;
constructor( constructor(rotate: boolean = true, fallback: string = DEFAULT_USER_AGENT) {
rotate: boolean = true,
fallback: string = "Basango/0.1 (+https://github.com/bernard-ng/basango)",
) {
this.rotate = rotate; this.rotate = rotate;
this.fallback = fallback; this.fallback = fallback;
} }
static og(): string { static og(): string {
return "facebookexternalhit/1.1"; return OPEN_GRAPH_USER_AGENT;
} }
get(): string { get(): string {
+59
View File
@@ -0,0 +1,59 @@
import fs from "node:fs";
import path from "node:path";
export interface PersistedRecord {
[key: string]: unknown;
}
export interface Persistor {
persist(record: PersistedRecord): Promise<void> | void;
close?: () => Promise<void> | void;
}
export interface JsonlPersistorOptions {
directory: string;
sourceId: string;
suffix?: string;
encoding?: BufferEncoding;
}
export class JsonlPersistor implements Persistor {
private readonly filePath: string;
private readonly encoding: BufferEncoding;
private pending: Promise<void> = Promise.resolve();
private closed = false;
constructor(options: JsonlPersistorOptions) {
const suffix = options.suffix ?? ".jsonl";
this.encoding = options.encoding ?? "utf-8";
fs.mkdirSync(options.directory, { recursive: true });
this.filePath = path.join(options.directory, `${options.sourceId}${suffix}`);
if (!fs.existsSync(this.filePath)) {
fs.writeFileSync(this.filePath, "", { encoding: this.encoding });
}
}
persist(record: PersistedRecord): Promise<void> {
if (this.closed) {
return Promise.reject(new Error("Persistor has been closed"));
}
const payload = `${JSON.stringify(record)}\n`;
this.pending = this.pending.then(async () => {
const file = Bun.file(this.filePath);
await Bun.write(file, payload, { append: true });
});
return this.pending;
}
async close(): Promise<void> {
this.closed = true;
await this.pending;
}
}
export type { JsonlPersistorOptions as JsonlOptions };
+158
View File
@@ -0,0 +1,158 @@
import { logger } from "@basango/logger";
import { PipelineConfigManager } from "@/config";
import { JsonlPersistor, Persistor } from "@/persistence";
import {
AnySourceConfig,
ClientConfig,
CrawlerConfig,
CrawlerConfigSchema,
PipelineConfig,
SourceKind,
} from "@/schema";
import { createDateRange } from "@/utils";
import { PageRangeSchema, PageRangeSpecSchema } from "@/schema";
export interface CrawlerInstance {
fetch: () => Promise<void> | void;
close?: () => Promise<void> | void;
}
export interface CrawlerContext {
pipeline: PipelineConfig;
source: AnySourceConfig;
clientConfig: ClientConfig;
crawlerConfig: CrawlerConfig;
persistors: Persistor[];
}
export type CrawlerFactory = (context: CrawlerContext) => CrawlerInstance;
const registry = new Map<SourceKind, CrawlerFactory>();
export const registerCrawler = (kind: SourceKind, factory: CrawlerFactory): void => {
registry.set(kind, factory);
};
export const clearCrawlerRegistry = (): void => {
registry.clear();
};
export interface RunSyncCrawlOptions {
sourceId: string;
env?: string;
pageRange?: string | null;
dateRange?: string | null;
category?: string | null;
notify?: boolean;
manager?: PipelineConfigManager;
persistFactory?: (context: {
pipeline: PipelineConfig;
source: AnySourceConfig;
resolvedSourceId: string;
}) => Persistor[];
}
const resolvePageRange = (spec?: string | null) => {
if (!spec) return undefined;
const parsed = PageRangeSpecSchema.parse(spec);
return PageRangeSchema.parse(parsed);
};
const resolveCrawlerConfig = (
source: AnySourceConfig,
options: RunSyncCrawlOptions,
): CrawlerConfig => {
const page_range = resolvePageRange(options.pageRange);
const date_range = options.dateRange ? createDateRange(options.dateRange) : undefined;
return CrawlerConfigSchema.parse({
source,
page_range,
date_range,
category: options.category ?? undefined,
notify: options.notify ?? false,
});
};
const createPersistors = (
context: { pipeline: PipelineConfig; source: AnySourceConfig; sourceId: string },
factory?: RunSyncCrawlOptions["persistFactory"],
): Persistor[] => {
if (factory) {
return factory({
pipeline: context.pipeline,
source: context.source,
resolvedSourceId: context.sourceId,
});
}
return [
new JsonlPersistor({
directory: context.pipeline.paths.data,
sourceId: context.sourceId,
}),
];
};
export const runSyncCrawl = async (options: RunSyncCrawlOptions): Promise<void> => {
const env = options.env ?? "development";
const manager = options.manager ?? new PipelineConfigManager({ env });
const pipeline = manager.get(env);
manager.setupLogging(pipeline);
const source = pipeline.sources.find(options.sourceId);
if (!source) {
throw new Error(`Source '${options.sourceId}' not found in configuration`);
}
const crawlerConfig = resolveCrawlerConfig(source, options);
const sourceId = source.source_id ?? options.sourceId;
const persistors = createPersistors({ pipeline, source, sourceId }, options.persistFactory);
const factory = registry.get(source.source_kind as SourceKind);
if (!factory) {
throw new Error(`No crawler registered for source kind '${source.source_kind}'`);
}
const context: CrawlerContext = {
pipeline,
source,
clientConfig: pipeline.fetch.client,
crawlerConfig,
persistors,
};
const crawler = factory(context);
if (!crawler || typeof crawler.fetch !== "function") {
throw new Error("Registered crawler did not return a valid instance");
}
try {
await crawler.fetch();
logger.info(
{
sourceId: options.sourceId,
kind: source.source_kind,
env,
},
"Synchronous crawl completed",
);
} finally {
for (const persistor of persistors) {
try {
await persistor.close?.();
} catch (error) {
logger.warn({ error }, "Failed to close persistor");
}
}
if (typeof crawler.close === "function") {
try {
await crawler.close();
} catch (error) {
logger.warn({ error }, "Failed to close crawler");
}
}
}
};