diff --git a/packages/db/src/synchronizers/data.ts b/packages/db/src/synchronizers/data.ts index e18f2c4..c5cfedf 100644 --- a/packages/db/src/synchronizers/data.ts +++ b/packages/db/src/synchronizers/data.ts @@ -20,6 +20,20 @@ type TargetOptions = { ignoreColumns?: Record; }; +type CursorEncoding = "string" | "number" | "buffer"; + +type SyncProgress = { + offset: number; + cursor: string | null; + cursorEncoding: CursorEncoding | null; +}; + +type SourceRow = { + data: Record; + cursor: string | null; + cursorEncoding: CursorEncoding | null; +}; + const DEFAULT_IGNORE: Record = { article: ["tsv", "image", "excerpt", "bias", "reliability", "transparency"], source: ["bias", "reliability", "transparency"], @@ -56,19 +70,21 @@ class Engine { async import(table: string): Promise { await this.ensureProgressTable(); - let startOffset = 0; + let startState: SyncProgress = { cursor: null, cursorEncoding: null, offset: 0 }; if (this.resume) { - startOffset = await this.getProgressOffset(table); - console.log(`Resuming import for ${table} from offset=${startOffset}`); + startState = await this.getProgressState(table); + console.log( + `Resuming import for ${table} from offset=${startState.offset}, cursor=${startState.cursor ?? "null"}`, + ); } else { await this.reset(table); - await this.setProgressOffset(table, 0); + await this.setProgressState(table, startState); } - return await this.paste(table, this.copy(table, startOffset), startOffset); + return await this.paste(table, this.copy(table, startState), startState); } - private async *copy(table: string, startOffset = 0): AsyncGenerator> { + private async *copy(table: string, state: SyncProgress): AsyncGenerator { const mysql = await import("mysql2/promise"); const source = mysql.createPool({ @@ -81,23 +97,49 @@ class Engine { user: this.sourceOptions.user, }); - let offset = startOffset; + let offset = state.offset ?? 0; const size = this.pageSize; + let cursorParam = this.decodeCursorValue(state.cursor, state.cursorEncoding); + let useCursor = cursorParam != null; + try { while (true) { - const [rows] = await source.query( - `SELECT * FROM \`${this.escapeBacktick(table)}\` ORDER BY \`id\` LIMIT ? OFFSET ?`, - [size, offset], - ); + let sql: string; + let params: unknown[]; + if (useCursor && cursorParam != null) { + sql = `SELECT * FROM \`${this.escapeBacktick(table)}\` WHERE \`id\` > ? ORDER BY \`id\` LIMIT ?`; + params = [cursorParam, size]; + } else { + sql = `SELECT * FROM \`${this.escapeBacktick(table)}\` ORDER BY \`id\` LIMIT ? OFFSET ?`; + params = [size, offset]; + } + + const [rows] = await source.query(sql, params); if (!rows || rows.length === 0) break; for (const row of rows) { - yield row as Record; + const cursorInfo = this.encodeCursorValue((row as Record).id); + yield { + cursor: cursorInfo.value, + cursorEncoding: cursorInfo.encoding, + data: row as Record, + }; } offset += rows.length; if (rows.length < size) break; + + const last = rows[rows.length - 1] as Record; + const lastCursor = this.encodeCursorValue(last.id); + const decoded = this.decodeCursorValue(lastCursor.value, lastCursor.encoding); + if (decoded != null) { + cursorParam = decoded; + useCursor = true; + } else if (useCursor) { + cursorParam = null; + useCursor = false; + } } } finally { try { @@ -108,8 +150,8 @@ class Engine { private async paste( table: string, - rows: AsyncGenerator>, - startOffset = 0, + rows: AsyncGenerator, + startState: SyncProgress, ): Promise { const target = await this.target.connect(); let total = 0; @@ -117,16 +159,19 @@ class Engine { let columns: string[] | null = null; let insertSql = ""; let upsertSql = ""; + let lastCursor = startState.cursor ?? null; + let lastCursorEncoding = startState.cursorEncoding ?? null; const ignored = this.ignoredColumnsFor(table); const ignoredSet = new Set(ignored); try { - for await (let row of rows) { + for await (const entry of rows) { + const row = entry.data; + const transformed = this.transformRowForTarget(table, row); if (!columns) { - row = this.transformRowForTarget(table, row); // Filter ignored columns and build column order - columns = Object.keys(row).filter((c) => !ignoredSet.has(c)); + columns = Object.keys(transformed).filter((c) => !ignoredSet.has(c)); // If article target has credibility but source not, include computed credibility if ( @@ -153,8 +198,6 @@ class Engine { await target.query("BEGIN"); } - // Row transform and params in column order - const transformed = this.transformRowForTarget(table, row); const params = columns!.map((c) => this.valueForColumn(c, transformed)); try { @@ -172,9 +215,21 @@ class Engine { } total++; inBatch++; + if (entry.cursor != null) { + lastCursor = entry.cursor; + lastCursorEncoding = entry.cursorEncoding ?? null; + } if (inBatch >= this.batchSize) { - await this.setProgressOffset(table, startOffset + total, target); + await this.setProgressState( + table, + { + cursor: lastCursor, + cursorEncoding: lastCursorEncoding, + offset: startState.offset + total, + }, + target, + ); await target.query("COMMIT"); inBatch = 0; await target.query("BEGIN"); @@ -183,7 +238,15 @@ class Engine { } if (inBatch > 0) { - await this.setProgressOffset(table, startOffset + total, target); + await this.setProgressState( + table, + { + cursor: lastCursor, + cursorEncoding: lastCursorEncoding, + offset: startState.offset + total, + }, + target, + ); await target.query("COMMIT"); } } catch (e) { @@ -398,6 +461,30 @@ class Engine { return new Date(); } + private encodeCursorValue(value: unknown): { + encoding: CursorEncoding | null; + value: string | null; + } { + if (value == null) return { encoding: null, value: null }; + if (Buffer.isBuffer(value)) { + return { encoding: "buffer", value: value.toString("hex") }; + } + if (typeof value === "number" || typeof value === "bigint") { + return { encoding: "number", value: String(value) }; + } + return { encoding: "string", value: String(value) }; + } + + private decodeCursorValue(value: string | null, encoding: CursorEncoding | null): unknown { + if (!value || !encoding) return null; + if (encoding === "buffer") return Buffer.from(value, "hex"); + if (encoding === "number") { + const parsed = Number(value); + return Number.isNaN(parsed) ? null : parsed; + } + return value; + } + private quote(id: string) { const norm = this.normalizedName(id); return `"${norm.replaceAll('"', '""')}"`; @@ -414,38 +501,53 @@ class Engine { `CREATE TABLE IF NOT EXISTS "__sync_state" ( table_name text PRIMARY KEY, last_offset integer NOT NULL DEFAULT 0, + last_cursor text, + cursor_encoding text, updated_at timestamp NOT NULL DEFAULT NOW() )`, ); + await client.query(`ALTER TABLE "__sync_state" ADD COLUMN IF NOT EXISTS last_cursor text`); + await client.query( + `ALTER TABLE "__sync_state" ADD COLUMN IF NOT EXISTS cursor_encoding text`, + ); } finally { client.release(); } } - private async getProgressOffset(table: string): Promise { + private async getProgressState(table: string): Promise { const client = await this.target.connect(); try { - const { rows } = await client.query<{ last_offset: number }>( - `SELECT last_offset FROM "__sync_state" WHERE table_name = $1`, + const { rows } = await client.query<{ + cursor_encoding: CursorEncoding | null; + last_cursor: string | null; + last_offset: number; + }>( + `SELECT last_offset, last_cursor, cursor_encoding FROM "__sync_state" WHERE table_name = $1`, [this.normalizedName(table)], ); - return rows[0]?.last_offset ?? 0; + const row = rows[0]; + return { + cursor: row?.last_cursor ?? null, + cursorEncoding: row?.cursor_encoding ?? null, + offset: row?.last_offset ?? 0, + }; } finally { client.release(); } } - private async setProgressOffset( + private async setProgressState( table: string, - offset: number, + state: SyncProgress, client?: PoolClient, ): Promise { const run = async (c: PoolClient) => { await c.query( - `INSERT INTO "__sync_state" (table_name, last_offset, updated_at) - VALUES ($1, $2, NOW()) - ON CONFLICT (table_name) DO UPDATE SET last_offset = EXCLUDED.last_offset, updated_at = NOW()`, - [this.normalizedName(table), offset], + `INSERT INTO "__sync_state" (table_name, last_offset, last_cursor, cursor_encoding, updated_at) + VALUES ($1, $2, $3, $4, NOW()) + ON CONFLICT (table_name) DO UPDATE SET last_offset = EXCLUDED.last_offset, last_cursor = EXCLUDED.last_cursor, cursor_encoding = EXCLUDED.cursor_encoding, updated_at = NOW()`, + [this.normalizedName(table), state.offset, state.cursor, state.cursorEncoding], ); }; if (client) return run(client);