diff --git a/packages/db/package.json b/packages/db/package.json index 62c55ae..bbcd8bf 100644 --- a/packages/db/package.json +++ b/packages/db/package.json @@ -27,7 +27,8 @@ "private": true, "scripts": { "clean": "rm -rf .turbo node_modules", - "sync:import": "bun ./src/importer/import.ts", + "sync:data": "bun ./src/synchronizers/data.ts", + "sync:tokens": "bun ./src/synchronizers/tokens.ts", "typecheck": "tsc --noEmit" } } diff --git a/packages/db/src/importer/import.ts b/packages/db/src/importer/import.ts deleted file mode 100644 index d7d22ab..0000000 --- a/packages/db/src/importer/import.ts +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env bun - -import { env } from "@/config"; -import { Engine } from "@/importer"; - -async function main() { - const engine = new Engine( - { - database: env("BASANGO_SOURCE_DATABASE_NAME"), - host: env("BASANGO_SOURCE_DATABASE_HOST"), - password: env("BASANGO_SOURCE_DATABASE_PASS"), - user: env("BASANGO_SOURCE_DATABASE_USER"), - }, - { - database: env("BASANGO_DATABASE_URL"), - }, - ); - - try { - const tables = process.argv.slice(2); - if (tables.length === 0) tables.push("user", "source", "article"); - for (const t of tables) { - const count = await engine.import(t); - console.log(`Imported ${count} records into ${t} table.`); - } - console.log("Import completed successfully"); - process.exit(0); - } finally { - await engine.close(); - } -} - -main().catch((err) => { - console.error(err?.message ?? err); - process.exit(1); -}); diff --git a/packages/db/src/importer/index.ts b/packages/db/src/importer/index.ts deleted file mode 100644 index bd104fd..0000000 --- a/packages/db/src/importer/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from "./engine"; -export * from "./import"; diff --git a/packages/db/src/importer/engine.ts b/packages/db/src/synchronizers/data.ts similarity index 88% rename from packages/db/src/importer/engine.ts rename to packages/db/src/synchronizers/data.ts index d2f3755..178a952 100644 --- a/packages/db/src/importer/engine.ts +++ b/packages/db/src/synchronizers/data.ts @@ -1,6 +1,9 @@ +#!/usr/bin/env bun + import { RowDataPacket } from "mysql2/promise"; import { Pool, PoolClient } from "pg"; +import { env } from "@/config"; import { computeReadingTime } from "@/utils/computed"; type SourceOptions = { @@ -22,31 +25,7 @@ const DEFAULT_IGNORE: Record = { source: ["bias", "reliability", "transparency"], }; -/** - * Engine - * - * Coordinates copying rows from a MySQL source into a PostgreSQL target in a - * controlled, transactional, batched manner. - * - * Responsibilities: - * - Establish and manage a connection pool to the target PostgreSQL database. - * - Stream rows from a MySQL source (via a temporary pool) using pagination. - * - Transform row values to match target expectations (UUID normalization, - * timestamp fallback, array parsing for categories/roles, computed JSON - * credibility, etc.). - * - Filter out ignored columns based on a configurable ignore map. - * - Insert rows into the target in configurable batch sizes with transactional - * commits every batch to limit long-running transactions. - * - Provide a safe reset operation that truncates the target table and manages - * session replication role toggling for Postgres. - * - * @param sourceOptions - connection and authentication options for the MySQL - * source (database, host, user, password, etc.). - * @param targetOptions - configuration for the Postgres target including - * connection string (database), optional pageSize, batchSize and per-table - * ignoreColumns map. - */ -export class Engine { +class Engine { private readonly target: Pool; private readonly ignore: Record; private readonly pageSize: number; @@ -293,15 +272,6 @@ export class Engine { } } - // TODO: this is to heavy for the import process need to find a way to compute on creation or later - // if (t === "article") { - // clone.token_statistics = computeTokenStatistics({ - // body: String(clone.body ?? ""), - // categories: Array.isArray(clone.categories) ? clone.categories : [], - // title: String(clone.title ?? ""), - // }); - // } - return clone; } @@ -438,3 +408,35 @@ async function safeRollback(client: PoolClient) { await client.query("ROLLBACK"); } catch {} } + +async function main() { + const engine = new Engine( + { + database: env("BASANGO_SOURCE_DATABASE_NAME"), + host: env("BASANGO_SOURCE_DATABASE_HOST"), + password: env("BASANGO_SOURCE_DATABASE_PASS"), + user: env("BASANGO_SOURCE_DATABASE_USER"), + }, + { + database: env("BASANGO_DATABASE_URL"), + }, + ); + + try { + const tables = process.argv.slice(2); + if (tables.length === 0) tables.push("user", "source", "article"); + for (const t of tables) { + const count = await engine.import(t); + console.log(`Imported ${count} records into ${t} table.`); + } + console.log("Import completed successfully"); + process.exit(0); + } finally { + await engine.close(); + } +} + +main().catch((err) => { + console.error(err?.message ?? err); + process.exit(1); +}); diff --git a/packages/db/src/synchronizers/tokens.ts b/packages/db/src/synchronizers/tokens.ts new file mode 100644 index 0000000..b28a941 --- /dev/null +++ b/packages/db/src/synchronizers/tokens.ts @@ -0,0 +1,131 @@ +#!/usr/bin/env bun + +import { Pool } from "pg"; + +import { env } from "@/config"; +import { computeTokenStatistics } from "@/utils/computed"; + +type ArticleRow = { + id: string; + title: string; + body: string; + categories: string[]; +}; + +class Engine { + private readonly db: Pool; + private readonly pageSize: number = 1000; + private readonly batchSize: number = 50; + + constructor(private readonly database: string) { + this.db = new Pool({ + allowExitOnIdle: true, + connectionString: this.database, + max: 16, + }); + console.log( + `Engine initialized with pageSize=${this.pageSize} and batchSize=${this.batchSize}`, + ); + } + + async synchronize() { + const client = await this.db.connect(); + console.log("Starting token statistics computation..."); + + try { + let cursor: string | null = null; // keyset pagination cursor on id (DESC) + for (;;) { + const params: Array = [this.pageSize]; + const sql: string = cursor + ? `SELECT id, title, body, COALESCE(categories, ARRAY[]::text[]) AS categories + FROM article + WHERE token_statistics IS NULL AND id < $2 + ORDER BY id DESC + LIMIT $1` + : `SELECT id, title, body, COALESCE(categories, ARRAY[]::text[]) AS categories + FROM article + WHERE token_statistics IS NULL + ORDER BY id DESC + LIMIT $1`; + if (cursor) params.push(cursor); + + const { rows } = await client.query(sql, params); + + if (rows.length === 0) break; + const ids: string[] = []; + const statistics: string[] = []; + + for (const r of rows) { + ids.push(r.id); + statistics.push( + JSON.stringify( + computeTokenStatistics({ + body: r.body, + categories: r.categories, + title: r.title, + }), + ), + ); + } + + console.log(`Apply updates in transactional sub-batches...`); + for (let i = 0; i < ids.length; i += this.batchSize) { + const idsChunk = ids.slice(i, i + this.batchSize); + const statsChunk = statistics.slice(i, i + this.batchSize); + + if (idsChunk.length === 0) continue; + + try { + await client.query("BEGIN"); + await client.query( + `UPDATE article AS a + SET token_statistics = u.token_statistics, updated_at = NOW() + FROM ( + SELECT + UNNEST($1::uuid[]) AS id, + (UNNEST($2::text[]))::jsonb AS token_statistics + ) AS u + WHERE a.id = u.id`, + [idsChunk, statsChunk], + ); + await client.query("COMMIT"); + console.log(`Updated batch: size=${idsChunk.length}`); + } catch (e) { + try { + await client.query("ROLLBACK"); + } catch {} + throw e; + } + } + + // advance cursor to the lowest id in this page (since we order DESC) + cursor = rows[rows.length - 1]!.id; + console.log(`Processed page: updated=${ids.length}`); + console.log(`Advancing cursor to id < ${cursor}`); + } + } finally { + client.release(); + } + } + + async close() { + await this.db.end(); + } +} + +async function main() { + const engine = new Engine(env("BASANGO_DATABASE_URL")); + + try { + await engine.synchronize(); + console.log("Token statistics computation completed successfully"); + process.exit(0); + } finally { + await engine.close(); + } +} + +main().catch((err) => { + console.error(err?.message ?? err); + process.exit(1); +});