From b5a24c886a5f2303fdf8495c751304124f294289 Mon Sep 17 00:00:00 2001 From: bernard-ng Date: Wed, 12 Nov 2025 17:02:01 +0200 Subject: [PATCH] feat(db): resumable updates --- packages/db/src/synchronizers/data.ts | 95 ++++++++++++++++++++++++--- 1 file changed, 86 insertions(+), 9 deletions(-) diff --git a/packages/db/src/synchronizers/data.ts b/packages/db/src/synchronizers/data.ts index 178a952..f1c664b 100644 --- a/packages/db/src/synchronizers/data.ts +++ b/packages/db/src/synchronizers/data.ts @@ -18,6 +18,7 @@ type TargetOptions = { batchSize?: number; pageSize?: number; ignoreColumns?: Record; + resume?: boolean; }; const DEFAULT_IGNORE: Record = { @@ -30,6 +31,7 @@ class Engine { private readonly ignore: Record; private readonly pageSize: number; private readonly batchSize: number; + private readonly resume: boolean; constructor( private readonly sourceOptions: SourceOptions, @@ -43,8 +45,9 @@ class Engine { this.ignore = { ...DEFAULT_IGNORE, ...(this.targetOptions.ignoreColumns ?? {}) }; this.pageSize = this.targetOptions.pageSize ?? 1000; this.batchSize = Math.max(1, this.targetOptions.batchSize ?? 50); + this.resume = !!this.targetOptions.resume; console.log( - `Engine initialized with pageSize=${this.pageSize} and batchSize=${this.batchSize}`, + `Engine initialized with pageSize=${this.pageSize} and batchSize=${this.batchSize} (resume=${this.resume})`, ); } @@ -53,11 +56,21 @@ class Engine { } async import(table: string): Promise { - await this.reset(table); - return await this.paste(table, this.copy(table)); + await this.ensureProgressTable(); + + let startOffset = 0; + if (this.resume) { + startOffset = await this.getProgressOffset(table); + console.log(`Resuming import for ${table} from offset=${startOffset}`); + } else { + await this.reset(table); + await this.setProgressOffset(table, 0); + } + + return await this.paste(table, this.copy(table, startOffset), startOffset); } - private async *copy(table: string): AsyncGenerator> { + private async *copy(table: string, startOffset = 0): AsyncGenerator> { const mysql = await import("mysql2/promise"); const source = mysql.createPool({ @@ -70,12 +83,12 @@ class Engine { user: this.sourceOptions.user, }); - let offset = 0; + let offset = startOffset; const size = this.pageSize; try { while (true) { const [rows] = await source.query( - `SELECT * FROM \`${this.escapeBacktick(table)}\` LIMIT ? OFFSET ?`, + `SELECT * FROM \`${this.escapeBacktick(table)}\` ORDER BY \`id\` LIMIT ? OFFSET ?`, [size, offset], ); @@ -98,12 +111,14 @@ class Engine { private async paste( table: string, rows: AsyncGenerator>, + startOffset = 0, ): Promise { const target = await this.target.connect(); let total = 0; let inBatch = 0; let columns: string[] | null = null; let insertSql = ""; + let upsertSql = ""; const ignored = this.ignoredColumnsFor(table); const ignoredSet = new Set(ignored); @@ -131,6 +146,12 @@ class Engine { const placeholders = columns.map((_, i) => `$${i + 1}`).join(", "); insertSql = `INSERT INTO ${this.quote(table)} (${colsSql}) VALUES (${placeholders})`; + const updateCols = columns.filter((c) => c !== "id"); + const setSql = updateCols + .map((c) => `${this.quote(c)} = EXCLUDED.${this.quote(c)}`) + .join(", "); + upsertSql = `${insertSql} ON CONFLICT ("id") DO UPDATE SET ${setSql}`; + await target.query("BEGIN"); } @@ -139,14 +160,14 @@ class Engine { const params = columns!.map((c) => this.valueForColumn(c, transformed)); try { - await target.query(insertSql, params); + await target.query(upsertSql, params); } catch (err: unknown) { // Fallback: coerce all *_at params to now() and retry once // This will never happen in production but anyway let's keep it safe const msg = String((err as Error)?.message ?? ""); if (msg.includes("invalid input syntax for type timestamp")) { const fixed = columns!.map((c, i) => (c.endsWith("_at") ? new Date() : params[i])); - await target.query(insertSql, fixed); + await target.query(upsertSql, fixed); } else { throw err; } @@ -155,6 +176,7 @@ class Engine { inBatch++; if (inBatch >= this.batchSize) { + await this.setProgressOffset(table, startOffset + total, target); await target.query("COMMIT"); inBatch = 0; await target.query("BEGIN"); @@ -163,6 +185,7 @@ class Engine { } if (inBatch > 0) { + await this.setProgressOffset(table, startOffset + total, target); await target.query("COMMIT"); } } catch (e) { @@ -385,6 +408,56 @@ class Engine { private escapeBacktick(id: string) { return id.replaceAll("`", "``"); } + + private async ensureProgressTable() { + const client = await this.target.connect(); + try { + await client.query( + `CREATE TABLE IF NOT EXISTS "__sync_state" ( + table_name text PRIMARY KEY, + last_offset integer NOT NULL DEFAULT 0, + updated_at timestamp NOT NULL DEFAULT NOW() + )`, + ); + } finally { + client.release(); + } + } + + private async getProgressOffset(table: string): Promise { + const client = await this.target.connect(); + try { + const { rows } = await client.query<{ last_offset: number }>( + `SELECT last_offset FROM "__sync_state" WHERE table_name = $1`, + [this.normalizedName(table)], + ); + return rows[0]?.last_offset ?? 0; + } finally { + client.release(); + } + } + + private async setProgressOffset( + table: string, + offset: number, + client?: PoolClient, + ): Promise { + const run = async (c: PoolClient) => { + await c.query( + `INSERT INTO "__sync_state" (table_name, last_offset, updated_at) + VALUES ($1, $2, NOW()) + ON CONFLICT (table_name) DO UPDATE SET last_offset = EXCLUDED.last_offset, updated_at = NOW()`, + [this.normalizedName(table), offset], + ); + }; + if (client) return run(client); + const c = await this.target.connect(); + try { + await run(c); + } finally { + c.release(); + } + } } function bufferToUuid(buf: Buffer): string { @@ -410,6 +483,10 @@ async function safeRollback(client: PoolClient) { } async function main() { + const argv = process.argv.slice(2); + const resume = argv.includes("--resume") || argv.includes("-r"); + const tables = argv.filter((a) => !a.startsWith("-")); + const engine = new Engine( { database: env("BASANGO_SOURCE_DATABASE_NAME"), @@ -419,11 +496,11 @@ async function main() { }, { database: env("BASANGO_DATABASE_URL"), + resume, }, ); try { - const tables = process.argv.slice(2); if (tables.length === 0) tables.push("user", "source", "article"); for (const t of tables) { const count = await engine.import(t);