Merge pull request #7 from bernard-ng/codex/implement-missing-features-in-crawler
feat(crawler): add http client and persistence utilities
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
"bullmq": "^4.17.0",
|
||||
"date-fns": "catalog:",
|
||||
"ioredis": "^5.3.2",
|
||||
"node-html-parser": "^6.1.10",
|
||||
"tiktoken": "^1.0.14",
|
||||
"zod": "catalog:"
|
||||
}
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
import { describe, expect, it, beforeEach, vi } from "vitest";
|
||||
|
||||
import { PipelineConfigManager } from "@/config";
|
||||
import { registerCrawler, clearCrawlerRegistry, runSyncCrawl } from "@/process/crawler";
|
||||
import { PipelineConfigSchema, SourceKindSchema } from "@/schema";
|
||||
|
||||
const createPipeline = () =>
|
||||
PipelineConfigSchema.parse({
|
||||
paths: {
|
||||
root: ".",
|
||||
data: ".",
|
||||
logs: ".",
|
||||
configs: ".",
|
||||
},
|
||||
sources: {
|
||||
html: [
|
||||
{
|
||||
source_id: "demo",
|
||||
source_url: "https://example.com",
|
||||
source_kind: SourceKindSchema.enum.html,
|
||||
pagination_template: "/page/{page}",
|
||||
},
|
||||
],
|
||||
wordpress: [],
|
||||
},
|
||||
});
|
||||
|
||||
describe("runSyncCrawl", () => {
|
||||
beforeEach(() => {
|
||||
clearCrawlerRegistry();
|
||||
});
|
||||
|
||||
it("invokes registered crawler factory", async () => {
|
||||
const pipeline = createPipeline();
|
||||
const fetch = vi.fn().mockResolvedValue(undefined);
|
||||
const close = vi.fn();
|
||||
|
||||
registerCrawler(SourceKindSchema.enum.html, () => ({ fetch, close }));
|
||||
|
||||
const manager = {
|
||||
get: vi.fn().mockReturnValue(pipeline),
|
||||
setupLogging: vi.fn(),
|
||||
} as unknown as PipelineConfigManager;
|
||||
|
||||
const persistClose = vi.fn();
|
||||
const persistFactory = vi.fn().mockReturnValue([
|
||||
{ persist: vi.fn(), close: persistClose },
|
||||
]);
|
||||
|
||||
await runSyncCrawl({
|
||||
sourceId: "demo",
|
||||
env: "test",
|
||||
manager,
|
||||
persistFactory,
|
||||
});
|
||||
|
||||
expect(fetch).toHaveBeenCalledTimes(1);
|
||||
expect(close).toHaveBeenCalledTimes(1);
|
||||
expect(persistFactory).toHaveBeenCalledWith({
|
||||
pipeline,
|
||||
source: pipeline.sources.html[0],
|
||||
resolvedSourceId: "demo",
|
||||
});
|
||||
expect(persistClose).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("throws when source is missing", async () => {
|
||||
const pipeline = createPipeline();
|
||||
registerCrawler(SourceKindSchema.enum.html, () => ({ fetch: vi.fn() }));
|
||||
const manager = {
|
||||
get: vi.fn().mockReturnValue(pipeline),
|
||||
setupLogging: vi.fn(),
|
||||
} as unknown as PipelineConfigManager;
|
||||
|
||||
await expect(
|
||||
runSyncCrawl({ sourceId: "unknown", manager }),
|
||||
).rejects.toThrow("Source 'unknown' not found");
|
||||
});
|
||||
|
||||
it("throws when no crawler registered", async () => {
|
||||
const pipeline = createPipeline();
|
||||
const manager = {
|
||||
get: vi.fn().mockReturnValue(pipeline),
|
||||
setupLogging: vi.fn(),
|
||||
} as unknown as PipelineConfigManager;
|
||||
|
||||
await expect(
|
||||
runSyncCrawl({ sourceId: "demo", manager }),
|
||||
).rejects.toThrow("No crawler registered");
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,83 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import { ClientConfigSchema } from "@/schema";
|
||||
import { HttpError, SyncHttpClient } from "@/http/http-client";
|
||||
|
||||
const createConfig = () =>
|
||||
ClientConfigSchema.parse({
|
||||
timeout: 1,
|
||||
max_retries: 2,
|
||||
backoff_initial: 0.001,
|
||||
backoff_multiplier: 2,
|
||||
backoff_max: 0.01,
|
||||
});
|
||||
|
||||
describe("SyncHttpClient", () => {
|
||||
it("retries transient statuses", async () => {
|
||||
const config = createConfig();
|
||||
const sleep = vi.fn().mockResolvedValue(undefined);
|
||||
const fetchMock = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce(new Response("retry", { status: 503 }))
|
||||
.mockResolvedValueOnce(new Response("ok", { status: 200, body: "done" }));
|
||||
|
||||
const client = new SyncHttpClient(config, { fetchImpl: fetchMock, sleep });
|
||||
const response = await client.get("https://example.com");
|
||||
|
||||
expect(await response.text()).toBe("done");
|
||||
expect(fetchMock).toHaveBeenCalledTimes(2);
|
||||
expect(sleep).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("respects retry-after header", async () => {
|
||||
const config = createConfig();
|
||||
const sleep = vi.fn().mockResolvedValue(undefined);
|
||||
const fetchMock = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce(
|
||||
new Response("retry", { status: 503, headers: { "Retry-After": "3" } }),
|
||||
)
|
||||
.mockResolvedValueOnce(new Response("ok", { status: 200 }));
|
||||
|
||||
const client = new SyncHttpClient(config, { fetchImpl: fetchMock, sleep });
|
||||
await client.get("https://example.com");
|
||||
|
||||
expect(sleep).toHaveBeenCalledWith(3000);
|
||||
});
|
||||
|
||||
it("throws http error on non transient failure", async () => {
|
||||
const config = createConfig();
|
||||
const fetchMock = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce(new Response("bad", { status: 404, statusText: "Not Found" }));
|
||||
|
||||
const client = new SyncHttpClient(config, { fetchImpl: fetchMock });
|
||||
|
||||
await expect(client.get("https://example.com"))
|
||||
.rejects.toBeInstanceOf(HttpError);
|
||||
});
|
||||
|
||||
it("sends json payload and query params", async () => {
|
||||
const config = createConfig();
|
||||
const fetchMock = vi
|
||||
.fn()
|
||||
.mockResolvedValue(new Response("ok", { status: 200 }));
|
||||
|
||||
const client = new SyncHttpClient(config, { fetchImpl: fetchMock });
|
||||
await client.post("https://example.com/api", {
|
||||
params: { page: 1, q: "news" },
|
||||
json: { hello: "world" },
|
||||
headers: { Authorization: "token" },
|
||||
});
|
||||
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
const [url, init] = fetchMock.mock.calls[0]!;
|
||||
expect(url).toBe("https://example.com/api?page=1&q=news");
|
||||
expect(init?.method).toBe("POST");
|
||||
expect(init?.body).toBe(JSON.stringify({ hello: "world" }));
|
||||
expect((init?.headers as Record<string, string>)["Authorization"]).toBe("token");
|
||||
expect((init?.headers as Record<string, string>)["Content-Type"]).toBe(
|
||||
"application/json",
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,49 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import { OpenGraphProvider } from "@/http/open-graph";
|
||||
|
||||
const sampleHtml = `
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Example Article</title>
|
||||
<meta property="og:title" content="Open Graph Title" />
|
||||
<meta property="og:description" content="Summary" />
|
||||
<meta property="og:image" content="https://cdn.example.com/image.jpg" />
|
||||
<meta property="og:url" content="https://example.com/article" />
|
||||
<link rel="canonical" href="https://example.com/canonical" />
|
||||
</head>
|
||||
<body>
|
||||
<img src="https://cdn.example.com/fallback.jpg" />
|
||||
</body>
|
||||
</html>
|
||||
`;
|
||||
|
||||
describe("OpenGraphProvider", () => {
|
||||
it("extracts metadata from html", () => {
|
||||
const metadata = OpenGraphProvider.consumeHtml(sampleHtml, "https://example.com");
|
||||
|
||||
expect(metadata).toEqual({
|
||||
title: "Open Graph Title",
|
||||
description: "Summary",
|
||||
image: "https://cdn.example.com/image.jpg",
|
||||
url: "https://example.com/article",
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to null when no metadata present", () => {
|
||||
const empty = OpenGraphProvider.consumeHtml("<html><body></body></html>");
|
||||
expect(empty).toBeNull();
|
||||
});
|
||||
|
||||
it("fetches metadata from url", async () => {
|
||||
const response = new Response(sampleHtml, { status: 200 });
|
||||
const get = vi.fn().mockResolvedValue(response);
|
||||
|
||||
const provider = new OpenGraphProvider({ client: { get } });
|
||||
const metadata = await provider.consumeUrl("https://example.com/article");
|
||||
|
||||
expect(get).toHaveBeenCalledWith("https://example.com/article");
|
||||
expect(metadata?.title).toBe("Open Graph Title");
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,27 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { JsonlPersistor } from "@/persistence";
|
||||
|
||||
describe("JsonlPersistor", () => {
|
||||
it("writes json lines sequentially", async () => {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "jsonl-test-"));
|
||||
const persistor = new JsonlPersistor({ directory: tempDir, sourceId: "demo" });
|
||||
|
||||
await Promise.all([
|
||||
persistor.persist({ id: 1, title: "first" }),
|
||||
persistor.persist({ id: 2, title: "second" }),
|
||||
]);
|
||||
|
||||
await persistor.close();
|
||||
|
||||
const contents = fs.readFileSync(path.join(tempDir, "demo.jsonl"), "utf-8");
|
||||
const lines = contents.trim().split("\n").map((line) => JSON.parse(line));
|
||||
|
||||
expect(lines).toContainEqual({ id: 1, title: "first" });
|
||||
expect(lines).toContainEqual({ id: 2, title: "second" });
|
||||
});
|
||||
});
|
||||
@@ -5,3 +5,11 @@ export const DEFAULT_CONFIG_FILES = [
|
||||
path.join(process.cwd(), "config", "pipeline.json"),
|
||||
path.join(process.cwd(), "pipeline.json"),
|
||||
];
|
||||
|
||||
export const DEFAULT_USER_AGENT =
|
||||
"Basango/0.1 (+https://github.com/bernard-ng/basango)";
|
||||
export const OPEN_GRAPH_USER_AGENT = "facebookexternalhit/1.1";
|
||||
|
||||
export const TRANSIENT_HTTP_STATUSES = [429, 500, 502, 503, 504] as const;
|
||||
|
||||
export const DEFAULT_RETRY_AFTER_HEADER = "retry-after";
|
||||
|
||||
@@ -0,0 +1,224 @@
|
||||
import { setTimeout as delay } from "node:timers/promises";
|
||||
|
||||
import type { ClientConfig } from "@/schema";
|
||||
import { DEFAULT_RETRY_AFTER_HEADER, DEFAULT_USER_AGENT, TRANSIENT_HTTP_STATUSES } from "@/constants";
|
||||
import { UserAgents } from "@/http/user-agent";
|
||||
|
||||
export type HttpHeaders = Record<string, string>;
|
||||
export type HttpParams = Record<string, string | number | boolean | null | undefined>;
|
||||
export type HttpData = unknown;
|
||||
|
||||
export interface HttpClientOptions {
|
||||
userAgentProvider?: UserAgents;
|
||||
defaultHeaders?: HttpHeaders;
|
||||
fetchImpl?: typeof fetch;
|
||||
sleep?: (ms: number) => Promise<void>;
|
||||
}
|
||||
|
||||
export interface HttpRequestOptions {
|
||||
headers?: HttpHeaders;
|
||||
params?: HttpParams;
|
||||
data?: HttpData;
|
||||
json?: HttpData;
|
||||
retryAfterHeader?: string;
|
||||
}
|
||||
|
||||
export class HttpError extends Error {
|
||||
readonly status: number;
|
||||
readonly response: Response;
|
||||
|
||||
constructor(message: string, response: Response) {
|
||||
super(message);
|
||||
this.status = response.status;
|
||||
this.response = response;
|
||||
}
|
||||
}
|
||||
|
||||
const defaultSleep = (ms: number): Promise<void> => {
|
||||
if (typeof Bun !== "undefined" && typeof Bun.sleep === "function") {
|
||||
return Bun.sleep(ms);
|
||||
}
|
||||
return delay(ms).then(() => undefined);
|
||||
};
|
||||
|
||||
const buildUrl = (url: string, params?: HttpParams): string => {
|
||||
if (!params || Object.keys(params).length === 0) {
|
||||
return url;
|
||||
}
|
||||
|
||||
const target = new URL(url);
|
||||
for (const [key, value] of Object.entries(params)) {
|
||||
if (value === undefined || value === null) continue;
|
||||
target.searchParams.set(key, String(value));
|
||||
}
|
||||
|
||||
return target.toString();
|
||||
};
|
||||
|
||||
const computeBackoff = (config: ClientConfig, attempt: number): number => {
|
||||
const base = Math.min(
|
||||
config.backoff_initial * Math.pow(config.backoff_multiplier, attempt),
|
||||
config.backoff_max,
|
||||
);
|
||||
const jitter = Math.random() * base * 0.25;
|
||||
return (base + jitter) * 1000;
|
||||
};
|
||||
|
||||
const parseRetryAfter = (header: string): number => {
|
||||
const numeric = Number.parseInt(header, 10);
|
||||
if (!Number.isNaN(numeric)) {
|
||||
return Math.max(0, numeric * 1000);
|
||||
}
|
||||
|
||||
const parsed = Date.parse(header);
|
||||
if (Number.isNaN(parsed)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const delta = parsed - Date.now();
|
||||
return delta > 0 ? delta : 0;
|
||||
};
|
||||
|
||||
export class BaseHttpClient {
|
||||
protected readonly config: ClientConfig;
|
||||
protected readonly fetchImpl: typeof fetch;
|
||||
protected readonly sleep: (ms: number) => Promise<void>;
|
||||
protected readonly headers: HttpHeaders;
|
||||
|
||||
constructor(config: ClientConfig, options: HttpClientOptions = {}) {
|
||||
this.config = config;
|
||||
const provider =
|
||||
options.userAgentProvider ??
|
||||
new UserAgents(config.rotate, config.user_agent ?? DEFAULT_USER_AGENT);
|
||||
const userAgent = provider.get() ?? config.user_agent ?? DEFAULT_USER_AGENT;
|
||||
|
||||
const baseHeaders: HttpHeaders = { "User-Agent": userAgent };
|
||||
if (options.defaultHeaders) {
|
||||
Object.assign(baseHeaders, options.defaultHeaders);
|
||||
}
|
||||
|
||||
this.headers = baseHeaders;
|
||||
this.fetchImpl = options.fetchImpl ?? fetch;
|
||||
this.sleep = options.sleep ?? defaultSleep;
|
||||
}
|
||||
|
||||
protected buildHeaders(headers?: HttpHeaders): HeadersInit {
|
||||
return { ...this.headers, ...(headers ?? {}) };
|
||||
}
|
||||
|
||||
protected async maybeDelay(
|
||||
attempt: number,
|
||||
response?: Response,
|
||||
retryAfterHeader: string = DEFAULT_RETRY_AFTER_HEADER,
|
||||
): Promise<void> {
|
||||
let waitMs = 0;
|
||||
|
||||
if (response) {
|
||||
const retryAfter = response.headers.get(retryAfterHeader);
|
||||
if (retryAfter && this.config.respect_retry_after) {
|
||||
waitMs = parseRetryAfter(retryAfter);
|
||||
}
|
||||
}
|
||||
|
||||
if (waitMs === 0) {
|
||||
waitMs = computeBackoff(this.config, attempt);
|
||||
}
|
||||
|
||||
if (waitMs > 0) {
|
||||
await this.sleep(waitMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class SyncHttpClient extends BaseHttpClient {
|
||||
async request(
|
||||
method: string,
|
||||
url: string,
|
||||
options: HttpRequestOptions = {},
|
||||
): Promise<Response> {
|
||||
const retryAfterHeader = options.retryAfterHeader ?? DEFAULT_RETRY_AFTER_HEADER;
|
||||
const target = buildUrl(url, options.params);
|
||||
|
||||
const maxAttempts = this.config.max_retries + 1;
|
||||
let attempt = 0;
|
||||
let lastError: unknown;
|
||||
|
||||
while (attempt < maxAttempts) {
|
||||
const controller = new AbortController();
|
||||
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
|
||||
try {
|
||||
timeoutHandle = setTimeout(() => controller.abort(), this.config.timeout * 1000);
|
||||
|
||||
const headers = this.buildHeaders(options.headers);
|
||||
const init: RequestInit = {
|
||||
method,
|
||||
headers,
|
||||
body: options.data as BodyInit | undefined,
|
||||
signal: controller.signal,
|
||||
redirect: this.config.follow_redirects ? "follow" : "manual",
|
||||
};
|
||||
|
||||
if (options.json !== undefined) {
|
||||
init.body = JSON.stringify(options.json);
|
||||
(init.headers as Record<string, string>)["Content-Type"] ??=
|
||||
"application/json";
|
||||
}
|
||||
|
||||
const response = await this.fetchImpl(target, init);
|
||||
|
||||
if (
|
||||
TRANSIENT_HTTP_STATUSES.includes(response.status as number) &&
|
||||
attempt < this.config.max_retries
|
||||
) {
|
||||
await this.maybeDelay(attempt, response, retryAfterHeader);
|
||||
attempt += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!response.ok) {
|
||||
throw new HttpError(`HTTP ${response.status} ${response.statusText}`, response);
|
||||
}
|
||||
|
||||
return response;
|
||||
} catch (error) {
|
||||
if (error instanceof HttpError) {
|
||||
lastError = error;
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (error instanceof DOMException && error.name === "AbortError") {
|
||||
lastError = error;
|
||||
if (attempt >= this.config.max_retries) {
|
||||
throw error;
|
||||
}
|
||||
} else {
|
||||
lastError = error;
|
||||
if (attempt >= this.config.max_retries) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
await this.maybeDelay(attempt);
|
||||
attempt += 1;
|
||||
} finally {
|
||||
if (timeoutHandle) {
|
||||
clearTimeout(timeoutHandle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError instanceof Error
|
||||
? lastError
|
||||
: new Error("HTTP request failed after retries");
|
||||
}
|
||||
|
||||
get(url: string, options?: Omit<HttpRequestOptions, "data" | "json">): Promise<Response> {
|
||||
return this.request("GET", url, options);
|
||||
}
|
||||
|
||||
post(url: string, options: HttpRequestOptions = {}): Promise<Response> {
|
||||
return this.request("POST", url, options);
|
||||
}
|
||||
}
|
||||
|
||||
export type HttpClient = SyncHttpClient;
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
import { parse } from "node-html-parser";
|
||||
|
||||
import { OPEN_GRAPH_USER_AGENT } from "@/constants";
|
||||
import type { ClientConfig } from "@/schema";
|
||||
import { SyncHttpClient } from "@/http/http-client";
|
||||
import { UserAgents } from "@/http/user-agent";
|
||||
|
||||
export interface OpenGraphMetadata {
|
||||
title?: string | null;
|
||||
description?: string | null;
|
||||
image?: string | null;
|
||||
url?: string | null;
|
||||
}
|
||||
|
||||
export interface OpenGraphProviderOptions {
|
||||
client?: Pick<SyncHttpClient, "get">;
|
||||
clientConfig?: ClientConfig;
|
||||
userAgentProvider?: UserAgents;
|
||||
}
|
||||
|
||||
const pick = (values: Array<string | null | undefined>): string | null => {
|
||||
for (const value of values) {
|
||||
if (value && value.trim().length > 0) {
|
||||
return value.trim();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
const extractMeta = (root: ReturnType<typeof parse>, property: string): string | null => {
|
||||
const selector = `meta[property='${property}'], meta[name='${property}']`;
|
||||
const node = root.querySelector(selector);
|
||||
if (!node) {
|
||||
return null;
|
||||
}
|
||||
return node.getAttribute("content") ?? null;
|
||||
};
|
||||
|
||||
export class OpenGraphProvider {
|
||||
private readonly client: Pick<SyncHttpClient, "get">;
|
||||
|
||||
constructor(options: OpenGraphProviderOptions = {}) {
|
||||
const provider =
|
||||
options.userAgentProvider ?? new UserAgents(false, OPEN_GRAPH_USER_AGENT);
|
||||
const clientConfig: ClientConfig =
|
||||
options.clientConfig ?? ({
|
||||
timeout: 20,
|
||||
user_agent: OPEN_GRAPH_USER_AGENT,
|
||||
follow_redirects: true,
|
||||
verify_ssl: true,
|
||||
rotate: false,
|
||||
max_retries: 2,
|
||||
backoff_initial: 1,
|
||||
backoff_multiplier: 2,
|
||||
backoff_max: 5,
|
||||
respect_retry_after: true,
|
||||
} satisfies ClientConfig);
|
||||
|
||||
this.client =
|
||||
options.client ??
|
||||
new SyncHttpClient(clientConfig, {
|
||||
userAgentProvider: provider,
|
||||
defaultHeaders: { "User-Agent": provider.og() },
|
||||
});
|
||||
}
|
||||
|
||||
async consumeUrl(url: string): Promise<OpenGraphMetadata | null> {
|
||||
try {
|
||||
const response = await this.client.get(url);
|
||||
const html = await response.text();
|
||||
return OpenGraphProvider.consumeHtml(html, url);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
static consumeHtml(html: string, url?: string): OpenGraphMetadata | null {
|
||||
if (!html) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const root = parse(html);
|
||||
const title = pick([
|
||||
extractMeta(root, "og:title"),
|
||||
root.querySelector("title")?.text,
|
||||
]);
|
||||
const description = pick([
|
||||
extractMeta(root, "og:description"),
|
||||
extractMeta(root, "description"),
|
||||
]);
|
||||
const image = pick([
|
||||
extractMeta(root, "og:image"),
|
||||
root.querySelector("img")?.getAttribute("src") ?? null,
|
||||
]);
|
||||
const canonical = pick([
|
||||
extractMeta(root, "og:url"),
|
||||
root.querySelector("link[rel='canonical']")?.getAttribute("href") ?? null,
|
||||
url ?? null,
|
||||
]);
|
||||
|
||||
if (!title && !description && !image && !canonical) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
title,
|
||||
description,
|
||||
image,
|
||||
url: canonical,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import { DEFAULT_USER_AGENT, OPEN_GRAPH_USER_AGENT } from "@/constants";
|
||||
|
||||
export class UserAgents {
|
||||
private static readonly USER_AGENTS: string[] = [
|
||||
"Mozilla/5.0 (iPhone; CPU iPhone OS 10_4_8; like Mac OS X) AppleWebKit/603.39 (KHTML, like Gecko) Chrome/52.0.3638.271 Mobile Safari/537.5",
|
||||
@@ -15,16 +17,13 @@ export class UserAgents {
|
||||
private readonly rotate: boolean;
|
||||
private readonly fallback: string;
|
||||
|
||||
constructor(
|
||||
rotate: boolean = true,
|
||||
fallback: string = "Basango/0.1 (+https://github.com/bernard-ng/basango)",
|
||||
) {
|
||||
constructor(rotate: boolean = true, fallback: string = DEFAULT_USER_AGENT) {
|
||||
this.rotate = rotate;
|
||||
this.fallback = fallback;
|
||||
}
|
||||
|
||||
static og(): string {
|
||||
return "facebookexternalhit/1.1";
|
||||
return OPEN_GRAPH_USER_AGENT;
|
||||
}
|
||||
|
||||
get(): string {
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
|
||||
export interface PersistedRecord {
|
||||
[key: string]: unknown;
|
||||
}
|
||||
|
||||
export interface Persistor {
|
||||
persist(record: PersistedRecord): Promise<void> | void;
|
||||
close?: () => Promise<void> | void;
|
||||
}
|
||||
|
||||
export interface JsonlPersistorOptions {
|
||||
directory: string;
|
||||
sourceId: string;
|
||||
suffix?: string;
|
||||
encoding?: BufferEncoding;
|
||||
}
|
||||
|
||||
export class JsonlPersistor implements Persistor {
|
||||
private readonly filePath: string;
|
||||
private readonly encoding: BufferEncoding;
|
||||
private pending: Promise<void> = Promise.resolve();
|
||||
private closed = false;
|
||||
|
||||
constructor(options: JsonlPersistorOptions) {
|
||||
const suffix = options.suffix ?? ".jsonl";
|
||||
this.encoding = options.encoding ?? "utf-8";
|
||||
|
||||
fs.mkdirSync(options.directory, { recursive: true });
|
||||
this.filePath = path.join(options.directory, `${options.sourceId}${suffix}`);
|
||||
|
||||
if (!fs.existsSync(this.filePath)) {
|
||||
fs.writeFileSync(this.filePath, "", { encoding: this.encoding });
|
||||
}
|
||||
}
|
||||
|
||||
persist(record: PersistedRecord): Promise<void> {
|
||||
if (this.closed) {
|
||||
return Promise.reject(new Error("Persistor has been closed"));
|
||||
}
|
||||
|
||||
const payload = `${JSON.stringify(record)}\n`;
|
||||
|
||||
this.pending = this.pending.then(async () => {
|
||||
const file = Bun.file(this.filePath);
|
||||
await Bun.write(file, payload, { append: true });
|
||||
});
|
||||
|
||||
return this.pending;
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
this.closed = true;
|
||||
await this.pending;
|
||||
}
|
||||
}
|
||||
|
||||
export type { JsonlPersistorOptions as JsonlOptions };
|
||||
|
||||
@@ -0,0 +1,158 @@
|
||||
import { logger } from "@basango/logger";
|
||||
|
||||
import { PipelineConfigManager } from "@/config";
|
||||
import { JsonlPersistor, Persistor } from "@/persistence";
|
||||
import {
|
||||
AnySourceConfig,
|
||||
ClientConfig,
|
||||
CrawlerConfig,
|
||||
CrawlerConfigSchema,
|
||||
PipelineConfig,
|
||||
SourceKind,
|
||||
} from "@/schema";
|
||||
import { createDateRange } from "@/utils";
|
||||
import { PageRangeSchema, PageRangeSpecSchema } from "@/schema";
|
||||
|
||||
export interface CrawlerInstance {
|
||||
fetch: () => Promise<void> | void;
|
||||
close?: () => Promise<void> | void;
|
||||
}
|
||||
|
||||
export interface CrawlerContext {
|
||||
pipeline: PipelineConfig;
|
||||
source: AnySourceConfig;
|
||||
clientConfig: ClientConfig;
|
||||
crawlerConfig: CrawlerConfig;
|
||||
persistors: Persistor[];
|
||||
}
|
||||
|
||||
export type CrawlerFactory = (context: CrawlerContext) => CrawlerInstance;
|
||||
|
||||
const registry = new Map<SourceKind, CrawlerFactory>();
|
||||
|
||||
export const registerCrawler = (kind: SourceKind, factory: CrawlerFactory): void => {
|
||||
registry.set(kind, factory);
|
||||
};
|
||||
|
||||
export const clearCrawlerRegistry = (): void => {
|
||||
registry.clear();
|
||||
};
|
||||
|
||||
export interface RunSyncCrawlOptions {
|
||||
sourceId: string;
|
||||
env?: string;
|
||||
pageRange?: string | null;
|
||||
dateRange?: string | null;
|
||||
category?: string | null;
|
||||
notify?: boolean;
|
||||
manager?: PipelineConfigManager;
|
||||
persistFactory?: (context: {
|
||||
pipeline: PipelineConfig;
|
||||
source: AnySourceConfig;
|
||||
resolvedSourceId: string;
|
||||
}) => Persistor[];
|
||||
}
|
||||
|
||||
const resolvePageRange = (spec?: string | null) => {
|
||||
if (!spec) return undefined;
|
||||
const parsed = PageRangeSpecSchema.parse(spec);
|
||||
return PageRangeSchema.parse(parsed);
|
||||
};
|
||||
|
||||
const resolveCrawlerConfig = (
|
||||
source: AnySourceConfig,
|
||||
options: RunSyncCrawlOptions,
|
||||
): CrawlerConfig => {
|
||||
const page_range = resolvePageRange(options.pageRange);
|
||||
const date_range = options.dateRange ? createDateRange(options.dateRange) : undefined;
|
||||
|
||||
return CrawlerConfigSchema.parse({
|
||||
source,
|
||||
page_range,
|
||||
date_range,
|
||||
category: options.category ?? undefined,
|
||||
notify: options.notify ?? false,
|
||||
});
|
||||
};
|
||||
|
||||
const createPersistors = (
|
||||
context: { pipeline: PipelineConfig; source: AnySourceConfig; sourceId: string },
|
||||
factory?: RunSyncCrawlOptions["persistFactory"],
|
||||
): Persistor[] => {
|
||||
if (factory) {
|
||||
return factory({
|
||||
pipeline: context.pipeline,
|
||||
source: context.source,
|
||||
resolvedSourceId: context.sourceId,
|
||||
});
|
||||
}
|
||||
|
||||
return [
|
||||
new JsonlPersistor({
|
||||
directory: context.pipeline.paths.data,
|
||||
sourceId: context.sourceId,
|
||||
}),
|
||||
];
|
||||
};
|
||||
|
||||
export const runSyncCrawl = async (options: RunSyncCrawlOptions): Promise<void> => {
|
||||
const env = options.env ?? "development";
|
||||
const manager = options.manager ?? new PipelineConfigManager({ env });
|
||||
const pipeline = manager.get(env);
|
||||
manager.setupLogging(pipeline);
|
||||
|
||||
const source = pipeline.sources.find(options.sourceId);
|
||||
if (!source) {
|
||||
throw new Error(`Source '${options.sourceId}' not found in configuration`);
|
||||
}
|
||||
|
||||
const crawlerConfig = resolveCrawlerConfig(source, options);
|
||||
const sourceId = source.source_id ?? options.sourceId;
|
||||
const persistors = createPersistors({ pipeline, source, sourceId }, options.persistFactory);
|
||||
|
||||
const factory = registry.get(source.source_kind as SourceKind);
|
||||
if (!factory) {
|
||||
throw new Error(`No crawler registered for source kind '${source.source_kind}'`);
|
||||
}
|
||||
|
||||
const context: CrawlerContext = {
|
||||
pipeline,
|
||||
source,
|
||||
clientConfig: pipeline.fetch.client,
|
||||
crawlerConfig,
|
||||
persistors,
|
||||
};
|
||||
|
||||
const crawler = factory(context);
|
||||
if (!crawler || typeof crawler.fetch !== "function") {
|
||||
throw new Error("Registered crawler did not return a valid instance");
|
||||
}
|
||||
|
||||
try {
|
||||
await crawler.fetch();
|
||||
logger.info(
|
||||
{
|
||||
sourceId: options.sourceId,
|
||||
kind: source.source_kind,
|
||||
env,
|
||||
},
|
||||
"Synchronous crawl completed",
|
||||
);
|
||||
} finally {
|
||||
for (const persistor of persistors) {
|
||||
try {
|
||||
await persistor.close?.();
|
||||
} catch (error) {
|
||||
logger.warn({ error }, "Failed to close persistor");
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof crawler.close === "function") {
|
||||
try {
|
||||
await crawler.close();
|
||||
} catch (error) {
|
||||
logger.warn({ error }, "Failed to close crawler");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user