diff --git a/basango/apps/crawler/package.json b/basango/apps/crawler/package.json index ef35b98..a92c5ef 100644 --- a/basango/apps/crawler/package.json +++ b/basango/apps/crawler/package.json @@ -15,6 +15,7 @@ "bullmq": "^4.17.0", "date-fns": "catalog:", "ioredis": "^5.3.2", + "node-html-parser": "^6.1.10", "tiktoken": "^1.0.14", "zod": "catalog:" } diff --git a/basango/apps/crawler/src/__tests__/crawler.test.ts b/basango/apps/crawler/src/__tests__/crawler.test.ts new file mode 100644 index 0000000..039370f --- /dev/null +++ b/basango/apps/crawler/src/__tests__/crawler.test.ts @@ -0,0 +1,91 @@ +import { describe, expect, it, beforeEach, vi } from "vitest"; + +import { PipelineConfigManager } from "@/config"; +import { registerCrawler, clearCrawlerRegistry, runSyncCrawl } from "@/process/crawler"; +import { PipelineConfigSchema, SourceKindSchema } from "@/schema"; + +const createPipeline = () => + PipelineConfigSchema.parse({ + paths: { + root: ".", + data: ".", + logs: ".", + configs: ".", + }, + sources: { + html: [ + { + source_id: "demo", + source_url: "https://example.com", + source_kind: SourceKindSchema.enum.html, + pagination_template: "/page/{page}", + }, + ], + wordpress: [], + }, + }); + +describe("runSyncCrawl", () => { + beforeEach(() => { + clearCrawlerRegistry(); + }); + + it("invokes registered crawler factory", async () => { + const pipeline = createPipeline(); + const fetch = vi.fn().mockResolvedValue(undefined); + const close = vi.fn(); + + registerCrawler(SourceKindSchema.enum.html, () => ({ fetch, close })); + + const manager = { + get: vi.fn().mockReturnValue(pipeline), + setupLogging: vi.fn(), + } as unknown as PipelineConfigManager; + + const persistClose = vi.fn(); + const persistFactory = vi.fn().mockReturnValue([ + { persist: vi.fn(), close: persistClose }, + ]); + + await runSyncCrawl({ + sourceId: "demo", + env: "test", + manager, + persistFactory, + }); + + expect(fetch).toHaveBeenCalledTimes(1); + expect(close).toHaveBeenCalledTimes(1); + expect(persistFactory).toHaveBeenCalledWith({ + pipeline, + source: pipeline.sources.html[0], + resolvedSourceId: "demo", + }); + expect(persistClose).toHaveBeenCalledTimes(1); + }); + + it("throws when source is missing", async () => { + const pipeline = createPipeline(); + registerCrawler(SourceKindSchema.enum.html, () => ({ fetch: vi.fn() })); + const manager = { + get: vi.fn().mockReturnValue(pipeline), + setupLogging: vi.fn(), + } as unknown as PipelineConfigManager; + + await expect( + runSyncCrawl({ sourceId: "unknown", manager }), + ).rejects.toThrow("Source 'unknown' not found"); + }); + + it("throws when no crawler registered", async () => { + const pipeline = createPipeline(); + const manager = { + get: vi.fn().mockReturnValue(pipeline), + setupLogging: vi.fn(), + } as unknown as PipelineConfigManager; + + await expect( + runSyncCrawl({ sourceId: "demo", manager }), + ).rejects.toThrow("No crawler registered"); + }); +}); diff --git a/basango/apps/crawler/src/__tests__/http-client.test.ts b/basango/apps/crawler/src/__tests__/http-client.test.ts new file mode 100644 index 0000000..2ad0048 --- /dev/null +++ b/basango/apps/crawler/src/__tests__/http-client.test.ts @@ -0,0 +1,83 @@ +import { describe, expect, it, vi } from "vitest"; + +import { ClientConfigSchema } from "@/schema"; +import { HttpError, SyncHttpClient } from "@/http/http-client"; + +const createConfig = () => + ClientConfigSchema.parse({ + timeout: 1, + max_retries: 2, + backoff_initial: 0.001, + backoff_multiplier: 2, + backoff_max: 0.01, + }); + +describe("SyncHttpClient", () => { + it("retries transient statuses", async () => { + const config = createConfig(); + const sleep = vi.fn().mockResolvedValue(undefined); + const fetchMock = vi + .fn() + .mockResolvedValueOnce(new Response("retry", { status: 503 })) + .mockResolvedValueOnce(new Response("ok", { status: 200, body: "done" })); + + const client = new SyncHttpClient(config, { fetchImpl: fetchMock, sleep }); + const response = await client.get("https://example.com"); + + expect(await response.text()).toBe("done"); + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(sleep).toHaveBeenCalled(); + }); + + it("respects retry-after header", async () => { + const config = createConfig(); + const sleep = vi.fn().mockResolvedValue(undefined); + const fetchMock = vi + .fn() + .mockResolvedValueOnce( + new Response("retry", { status: 503, headers: { "Retry-After": "3" } }), + ) + .mockResolvedValueOnce(new Response("ok", { status: 200 })); + + const client = new SyncHttpClient(config, { fetchImpl: fetchMock, sleep }); + await client.get("https://example.com"); + + expect(sleep).toHaveBeenCalledWith(3000); + }); + + it("throws http error on non transient failure", async () => { + const config = createConfig(); + const fetchMock = vi + .fn() + .mockResolvedValueOnce(new Response("bad", { status: 404, statusText: "Not Found" })); + + const client = new SyncHttpClient(config, { fetchImpl: fetchMock }); + + await expect(client.get("https://example.com")) + .rejects.toBeInstanceOf(HttpError); + }); + + it("sends json payload and query params", async () => { + const config = createConfig(); + const fetchMock = vi + .fn() + .mockResolvedValue(new Response("ok", { status: 200 })); + + const client = new SyncHttpClient(config, { fetchImpl: fetchMock }); + await client.post("https://example.com/api", { + params: { page: 1, q: "news" }, + json: { hello: "world" }, + headers: { Authorization: "token" }, + }); + + expect(fetchMock).toHaveBeenCalledTimes(1); + const [url, init] = fetchMock.mock.calls[0]!; + expect(url).toBe("https://example.com/api?page=1&q=news"); + expect(init?.method).toBe("POST"); + expect(init?.body).toBe(JSON.stringify({ hello: "world" })); + expect((init?.headers as Record)["Authorization"]).toBe("token"); + expect((init?.headers as Record)["Content-Type"]).toBe( + "application/json", + ); + }); +}); diff --git a/basango/apps/crawler/src/__tests__/open-graph.test.ts b/basango/apps/crawler/src/__tests__/open-graph.test.ts new file mode 100644 index 0000000..d3e51e7 --- /dev/null +++ b/basango/apps/crawler/src/__tests__/open-graph.test.ts @@ -0,0 +1,49 @@ +import { describe, expect, it, vi } from "vitest"; + +import { OpenGraphProvider } from "@/http/open-graph"; + +const sampleHtml = ` + + + + Example Article + + + + + + + + + + +`; + +describe("OpenGraphProvider", () => { + it("extracts metadata from html", () => { + const metadata = OpenGraphProvider.consumeHtml(sampleHtml, "https://example.com"); + + expect(metadata).toEqual({ + title: "Open Graph Title", + description: "Summary", + image: "https://cdn.example.com/image.jpg", + url: "https://example.com/article", + }); + }); + + it("falls back to null when no metadata present", () => { + const empty = OpenGraphProvider.consumeHtml(""); + expect(empty).toBeNull(); + }); + + it("fetches metadata from url", async () => { + const response = new Response(sampleHtml, { status: 200 }); + const get = vi.fn().mockResolvedValue(response); + + const provider = new OpenGraphProvider({ client: { get } }); + const metadata = await provider.consumeUrl("https://example.com/article"); + + expect(get).toHaveBeenCalledWith("https://example.com/article"); + expect(metadata?.title).toBe("Open Graph Title"); + }); +}); diff --git a/basango/apps/crawler/src/__tests__/persistence.test.ts b/basango/apps/crawler/src/__tests__/persistence.test.ts new file mode 100644 index 0000000..05d9e44 --- /dev/null +++ b/basango/apps/crawler/src/__tests__/persistence.test.ts @@ -0,0 +1,27 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import { JsonlPersistor } from "@/persistence"; + +describe("JsonlPersistor", () => { + it("writes json lines sequentially", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "jsonl-test-")); + const persistor = new JsonlPersistor({ directory: tempDir, sourceId: "demo" }); + + await Promise.all([ + persistor.persist({ id: 1, title: "first" }), + persistor.persist({ id: 2, title: "second" }), + ]); + + await persistor.close(); + + const contents = fs.readFileSync(path.join(tempDir, "demo.jsonl"), "utf-8"); + const lines = contents.trim().split("\n").map((line) => JSON.parse(line)); + + expect(lines).toContainEqual({ id: 1, title: "first" }); + expect(lines).toContainEqual({ id: 2, title: "second" }); + }); +}); diff --git a/basango/apps/crawler/src/constants.ts b/basango/apps/crawler/src/constants.ts index 79e2486..0c4e0e1 100644 --- a/basango/apps/crawler/src/constants.ts +++ b/basango/apps/crawler/src/constants.ts @@ -5,3 +5,11 @@ export const DEFAULT_CONFIG_FILES = [ path.join(process.cwd(), "config", "pipeline.json"), path.join(process.cwd(), "pipeline.json"), ]; + +export const DEFAULT_USER_AGENT = + "Basango/0.1 (+https://github.com/bernard-ng/basango)"; +export const OPEN_GRAPH_USER_AGENT = "facebookexternalhit/1.1"; + +export const TRANSIENT_HTTP_STATUSES = [429, 500, 502, 503, 504] as const; + +export const DEFAULT_RETRY_AFTER_HEADER = "retry-after"; diff --git a/basango/apps/crawler/src/http/http-client.ts b/basango/apps/crawler/src/http/http-client.ts index e69de29..79b3c75 100644 --- a/basango/apps/crawler/src/http/http-client.ts +++ b/basango/apps/crawler/src/http/http-client.ts @@ -0,0 +1,224 @@ +import { setTimeout as delay } from "node:timers/promises"; + +import type { ClientConfig } from "@/schema"; +import { DEFAULT_RETRY_AFTER_HEADER, DEFAULT_USER_AGENT, TRANSIENT_HTTP_STATUSES } from "@/constants"; +import { UserAgents } from "@/http/user-agent"; + +export type HttpHeaders = Record; +export type HttpParams = Record; +export type HttpData = unknown; + +export interface HttpClientOptions { + userAgentProvider?: UserAgents; + defaultHeaders?: HttpHeaders; + fetchImpl?: typeof fetch; + sleep?: (ms: number) => Promise; +} + +export interface HttpRequestOptions { + headers?: HttpHeaders; + params?: HttpParams; + data?: HttpData; + json?: HttpData; + retryAfterHeader?: string; +} + +export class HttpError extends Error { + readonly status: number; + readonly response: Response; + + constructor(message: string, response: Response) { + super(message); + this.status = response.status; + this.response = response; + } +} + +const defaultSleep = (ms: number): Promise => { + if (typeof Bun !== "undefined" && typeof Bun.sleep === "function") { + return Bun.sleep(ms); + } + return delay(ms).then(() => undefined); +}; + +const buildUrl = (url: string, params?: HttpParams): string => { + if (!params || Object.keys(params).length === 0) { + return url; + } + + const target = new URL(url); + for (const [key, value] of Object.entries(params)) { + if (value === undefined || value === null) continue; + target.searchParams.set(key, String(value)); + } + + return target.toString(); +}; + +const computeBackoff = (config: ClientConfig, attempt: number): number => { + const base = Math.min( + config.backoff_initial * Math.pow(config.backoff_multiplier, attempt), + config.backoff_max, + ); + const jitter = Math.random() * base * 0.25; + return (base + jitter) * 1000; +}; + +const parseRetryAfter = (header: string): number => { + const numeric = Number.parseInt(header, 10); + if (!Number.isNaN(numeric)) { + return Math.max(0, numeric * 1000); + } + + const parsed = Date.parse(header); + if (Number.isNaN(parsed)) { + return 0; + } + + const delta = parsed - Date.now(); + return delta > 0 ? delta : 0; +}; + +export class BaseHttpClient { + protected readonly config: ClientConfig; + protected readonly fetchImpl: typeof fetch; + protected readonly sleep: (ms: number) => Promise; + protected readonly headers: HttpHeaders; + + constructor(config: ClientConfig, options: HttpClientOptions = {}) { + this.config = config; + const provider = + options.userAgentProvider ?? + new UserAgents(config.rotate, config.user_agent ?? DEFAULT_USER_AGENT); + const userAgent = provider.get() ?? config.user_agent ?? DEFAULT_USER_AGENT; + + const baseHeaders: HttpHeaders = { "User-Agent": userAgent }; + if (options.defaultHeaders) { + Object.assign(baseHeaders, options.defaultHeaders); + } + + this.headers = baseHeaders; + this.fetchImpl = options.fetchImpl ?? fetch; + this.sleep = options.sleep ?? defaultSleep; + } + + protected buildHeaders(headers?: HttpHeaders): HeadersInit { + return { ...this.headers, ...(headers ?? {}) }; + } + + protected async maybeDelay( + attempt: number, + response?: Response, + retryAfterHeader: string = DEFAULT_RETRY_AFTER_HEADER, + ): Promise { + let waitMs = 0; + + if (response) { + const retryAfter = response.headers.get(retryAfterHeader); + if (retryAfter && this.config.respect_retry_after) { + waitMs = parseRetryAfter(retryAfter); + } + } + + if (waitMs === 0) { + waitMs = computeBackoff(this.config, attempt); + } + + if (waitMs > 0) { + await this.sleep(waitMs); + } + } +} + +export class SyncHttpClient extends BaseHttpClient { + async request( + method: string, + url: string, + options: HttpRequestOptions = {}, + ): Promise { + const retryAfterHeader = options.retryAfterHeader ?? DEFAULT_RETRY_AFTER_HEADER; + const target = buildUrl(url, options.params); + + const maxAttempts = this.config.max_retries + 1; + let attempt = 0; + let lastError: unknown; + + while (attempt < maxAttempts) { + const controller = new AbortController(); + let timeoutHandle: ReturnType | undefined; + try { + timeoutHandle = setTimeout(() => controller.abort(), this.config.timeout * 1000); + + const headers = this.buildHeaders(options.headers); + const init: RequestInit = { + method, + headers, + body: options.data as BodyInit | undefined, + signal: controller.signal, + redirect: this.config.follow_redirects ? "follow" : "manual", + }; + + if (options.json !== undefined) { + init.body = JSON.stringify(options.json); + (init.headers as Record)["Content-Type"] ??= + "application/json"; + } + + const response = await this.fetchImpl(target, init); + + if ( + TRANSIENT_HTTP_STATUSES.includes(response.status as number) && + attempt < this.config.max_retries + ) { + await this.maybeDelay(attempt, response, retryAfterHeader); + attempt += 1; + continue; + } + + if (!response.ok) { + throw new HttpError(`HTTP ${response.status} ${response.statusText}`, response); + } + + return response; + } catch (error) { + if (error instanceof HttpError) { + lastError = error; + throw error; + } + + if (error instanceof DOMException && error.name === "AbortError") { + lastError = error; + if (attempt >= this.config.max_retries) { + throw error; + } + } else { + lastError = error; + if (attempt >= this.config.max_retries) { + throw error; + } + } + + await this.maybeDelay(attempt); + attempt += 1; + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + } + } + + throw lastError instanceof Error + ? lastError + : new Error("HTTP request failed after retries"); + } + + get(url: string, options?: Omit): Promise { + return this.request("GET", url, options); + } + + post(url: string, options: HttpRequestOptions = {}): Promise { + return this.request("POST", url, options); + } +} + +export type HttpClient = SyncHttpClient; diff --git a/basango/apps/crawler/src/http/open-graph.ts b/basango/apps/crawler/src/http/open-graph.ts index e69de29..9aabae8 100644 --- a/basango/apps/crawler/src/http/open-graph.ts +++ b/basango/apps/crawler/src/http/open-graph.ts @@ -0,0 +1,112 @@ +import { parse } from "node-html-parser"; + +import { OPEN_GRAPH_USER_AGENT } from "@/constants"; +import type { ClientConfig } from "@/schema"; +import { SyncHttpClient } from "@/http/http-client"; +import { UserAgents } from "@/http/user-agent"; + +export interface OpenGraphMetadata { + title?: string | null; + description?: string | null; + image?: string | null; + url?: string | null; +} + +export interface OpenGraphProviderOptions { + client?: Pick; + clientConfig?: ClientConfig; + userAgentProvider?: UserAgents; +} + +const pick = (values: Array): string | null => { + for (const value of values) { + if (value && value.trim().length > 0) { + return value.trim(); + } + } + return null; +}; + +const extractMeta = (root: ReturnType, property: string): string | null => { + const selector = `meta[property='${property}'], meta[name='${property}']`; + const node = root.querySelector(selector); + if (!node) { + return null; + } + return node.getAttribute("content") ?? null; +}; + +export class OpenGraphProvider { + private readonly client: Pick; + + constructor(options: OpenGraphProviderOptions = {}) { + const provider = + options.userAgentProvider ?? new UserAgents(false, OPEN_GRAPH_USER_AGENT); + const clientConfig: ClientConfig = + options.clientConfig ?? ({ + timeout: 20, + user_agent: OPEN_GRAPH_USER_AGENT, + follow_redirects: true, + verify_ssl: true, + rotate: false, + max_retries: 2, + backoff_initial: 1, + backoff_multiplier: 2, + backoff_max: 5, + respect_retry_after: true, + } satisfies ClientConfig); + + this.client = + options.client ?? + new SyncHttpClient(clientConfig, { + userAgentProvider: provider, + defaultHeaders: { "User-Agent": provider.og() }, + }); + } + + async consumeUrl(url: string): Promise { + try { + const response = await this.client.get(url); + const html = await response.text(); + return OpenGraphProvider.consumeHtml(html, url); + } catch { + return null; + } + } + + static consumeHtml(html: string, url?: string): OpenGraphMetadata | null { + if (!html) { + return null; + } + + const root = parse(html); + const title = pick([ + extractMeta(root, "og:title"), + root.querySelector("title")?.text, + ]); + const description = pick([ + extractMeta(root, "og:description"), + extractMeta(root, "description"), + ]); + const image = pick([ + extractMeta(root, "og:image"), + root.querySelector("img")?.getAttribute("src") ?? null, + ]); + const canonical = pick([ + extractMeta(root, "og:url"), + root.querySelector("link[rel='canonical']")?.getAttribute("href") ?? null, + url ?? null, + ]); + + if (!title && !description && !image && !canonical) { + return null; + } + + return { + title, + description, + image, + url: canonical, + }; + } +} diff --git a/basango/apps/crawler/src/http/user-agent.ts b/basango/apps/crawler/src/http/user-agent.ts index a4656d1..b7cb1c3 100644 --- a/basango/apps/crawler/src/http/user-agent.ts +++ b/basango/apps/crawler/src/http/user-agent.ts @@ -1,3 +1,5 @@ +import { DEFAULT_USER_AGENT, OPEN_GRAPH_USER_AGENT } from "@/constants"; + export class UserAgents { private static readonly USER_AGENTS: string[] = [ "Mozilla/5.0 (iPhone; CPU iPhone OS 10_4_8; like Mac OS X) AppleWebKit/603.39 (KHTML, like Gecko) Chrome/52.0.3638.271 Mobile Safari/537.5", @@ -15,16 +17,13 @@ export class UserAgents { private readonly rotate: boolean; private readonly fallback: string; - constructor( - rotate: boolean = true, - fallback: string = "Basango/0.1 (+https://github.com/bernard-ng/basango)", - ) { + constructor(rotate: boolean = true, fallback: string = DEFAULT_USER_AGENT) { this.rotate = rotate; this.fallback = fallback; } static og(): string { - return "facebookexternalhit/1.1"; + return OPEN_GRAPH_USER_AGENT; } get(): string { diff --git a/basango/apps/crawler/src/persistence.ts b/basango/apps/crawler/src/persistence.ts index e69de29..5d45222 100644 --- a/basango/apps/crawler/src/persistence.ts +++ b/basango/apps/crawler/src/persistence.ts @@ -0,0 +1,59 @@ +import fs from "node:fs"; +import path from "node:path"; + +export interface PersistedRecord { + [key: string]: unknown; +} + +export interface Persistor { + persist(record: PersistedRecord): Promise | void; + close?: () => Promise | void; +} + +export interface JsonlPersistorOptions { + directory: string; + sourceId: string; + suffix?: string; + encoding?: BufferEncoding; +} + +export class JsonlPersistor implements Persistor { + private readonly filePath: string; + private readonly encoding: BufferEncoding; + private pending: Promise = Promise.resolve(); + private closed = false; + + constructor(options: JsonlPersistorOptions) { + const suffix = options.suffix ?? ".jsonl"; + this.encoding = options.encoding ?? "utf-8"; + + fs.mkdirSync(options.directory, { recursive: true }); + this.filePath = path.join(options.directory, `${options.sourceId}${suffix}`); + + if (!fs.existsSync(this.filePath)) { + fs.writeFileSync(this.filePath, "", { encoding: this.encoding }); + } + } + + persist(record: PersistedRecord): Promise { + if (this.closed) { + return Promise.reject(new Error("Persistor has been closed")); + } + + const payload = `${JSON.stringify(record)}\n`; + + this.pending = this.pending.then(async () => { + const file = Bun.file(this.filePath); + await Bun.write(file, payload, { append: true }); + }); + + return this.pending; + } + + async close(): Promise { + this.closed = true; + await this.pending; + } +} + +export type { JsonlPersistorOptions as JsonlOptions }; diff --git a/basango/apps/crawler/src/process/crawler.ts b/basango/apps/crawler/src/process/crawler.ts index e69de29..dfe60d8 100644 --- a/basango/apps/crawler/src/process/crawler.ts +++ b/basango/apps/crawler/src/process/crawler.ts @@ -0,0 +1,158 @@ +import { logger } from "@basango/logger"; + +import { PipelineConfigManager } from "@/config"; +import { JsonlPersistor, Persistor } from "@/persistence"; +import { + AnySourceConfig, + ClientConfig, + CrawlerConfig, + CrawlerConfigSchema, + PipelineConfig, + SourceKind, +} from "@/schema"; +import { createDateRange } from "@/utils"; +import { PageRangeSchema, PageRangeSpecSchema } from "@/schema"; + +export interface CrawlerInstance { + fetch: () => Promise | void; + close?: () => Promise | void; +} + +export interface CrawlerContext { + pipeline: PipelineConfig; + source: AnySourceConfig; + clientConfig: ClientConfig; + crawlerConfig: CrawlerConfig; + persistors: Persistor[]; +} + +export type CrawlerFactory = (context: CrawlerContext) => CrawlerInstance; + +const registry = new Map(); + +export const registerCrawler = (kind: SourceKind, factory: CrawlerFactory): void => { + registry.set(kind, factory); +}; + +export const clearCrawlerRegistry = (): void => { + registry.clear(); +}; + +export interface RunSyncCrawlOptions { + sourceId: string; + env?: string; + pageRange?: string | null; + dateRange?: string | null; + category?: string | null; + notify?: boolean; + manager?: PipelineConfigManager; + persistFactory?: (context: { + pipeline: PipelineConfig; + source: AnySourceConfig; + resolvedSourceId: string; + }) => Persistor[]; +} + +const resolvePageRange = (spec?: string | null) => { + if (!spec) return undefined; + const parsed = PageRangeSpecSchema.parse(spec); + return PageRangeSchema.parse(parsed); +}; + +const resolveCrawlerConfig = ( + source: AnySourceConfig, + options: RunSyncCrawlOptions, +): CrawlerConfig => { + const page_range = resolvePageRange(options.pageRange); + const date_range = options.dateRange ? createDateRange(options.dateRange) : undefined; + + return CrawlerConfigSchema.parse({ + source, + page_range, + date_range, + category: options.category ?? undefined, + notify: options.notify ?? false, + }); +}; + +const createPersistors = ( + context: { pipeline: PipelineConfig; source: AnySourceConfig; sourceId: string }, + factory?: RunSyncCrawlOptions["persistFactory"], +): Persistor[] => { + if (factory) { + return factory({ + pipeline: context.pipeline, + source: context.source, + resolvedSourceId: context.sourceId, + }); + } + + return [ + new JsonlPersistor({ + directory: context.pipeline.paths.data, + sourceId: context.sourceId, + }), + ]; +}; + +export const runSyncCrawl = async (options: RunSyncCrawlOptions): Promise => { + const env = options.env ?? "development"; + const manager = options.manager ?? new PipelineConfigManager({ env }); + const pipeline = manager.get(env); + manager.setupLogging(pipeline); + + const source = pipeline.sources.find(options.sourceId); + if (!source) { + throw new Error(`Source '${options.sourceId}' not found in configuration`); + } + + const crawlerConfig = resolveCrawlerConfig(source, options); + const sourceId = source.source_id ?? options.sourceId; + const persistors = createPersistors({ pipeline, source, sourceId }, options.persistFactory); + + const factory = registry.get(source.source_kind as SourceKind); + if (!factory) { + throw new Error(`No crawler registered for source kind '${source.source_kind}'`); + } + + const context: CrawlerContext = { + pipeline, + source, + clientConfig: pipeline.fetch.client, + crawlerConfig, + persistors, + }; + + const crawler = factory(context); + if (!crawler || typeof crawler.fetch !== "function") { + throw new Error("Registered crawler did not return a valid instance"); + } + + try { + await crawler.fetch(); + logger.info( + { + sourceId: options.sourceId, + kind: source.source_kind, + env, + }, + "Synchronous crawl completed", + ); + } finally { + for (const persistor of persistors) { + try { + await persistor.close?.(); + } catch (error) { + logger.warn({ error }, "Failed to close persistor"); + } + } + + if (typeof crawler.close === "function") { + try { + await crawler.close(); + } catch (error) { + logger.warn({ error }, "Failed to close crawler"); + } + } + } +};