feat(db): cursor based pagination

This commit is contained in:
2025-11-14 16:03:56 +02:00
parent c155dafef4
commit 7b01f67358
+133 -31
View File
@@ -20,6 +20,20 @@ type TargetOptions = {
ignoreColumns?: Record<string, string[]>;
};
type CursorEncoding = "string" | "number" | "buffer";
type SyncProgress = {
offset: number;
cursor: string | null;
cursorEncoding: CursorEncoding | null;
};
type SourceRow = {
data: Record<string, unknown>;
cursor: string | null;
cursorEncoding: CursorEncoding | null;
};
const DEFAULT_IGNORE: Record<string, string[]> = {
article: ["tsv", "image", "excerpt", "bias", "reliability", "transparency"],
source: ["bias", "reliability", "transparency"],
@@ -56,19 +70,21 @@ class Engine {
async import(table: string): Promise<number> {
await this.ensureProgressTable();
let startOffset = 0;
let startState: SyncProgress = { cursor: null, cursorEncoding: null, offset: 0 };
if (this.resume) {
startOffset = await this.getProgressOffset(table);
console.log(`Resuming import for ${table} from offset=${startOffset}`);
startState = await this.getProgressState(table);
console.log(
`Resuming import for ${table} from offset=${startState.offset}, cursor=${startState.cursor ?? "null"}`,
);
} else {
await this.reset(table);
await this.setProgressOffset(table, 0);
await this.setProgressState(table, startState);
}
return await this.paste(table, this.copy(table, startOffset), startOffset);
return await this.paste(table, this.copy(table, startState), startState);
}
private async *copy(table: string, startOffset = 0): AsyncGenerator<Record<string, unknown>> {
private async *copy(table: string, state: SyncProgress): AsyncGenerator<SourceRow> {
const mysql = await import("mysql2/promise");
const source = mysql.createPool({
@@ -81,23 +97,49 @@ class Engine {
user: this.sourceOptions.user,
});
let offset = startOffset;
let offset = state.offset ?? 0;
const size = this.pageSize;
let cursorParam = this.decodeCursorValue(state.cursor, state.cursorEncoding);
let useCursor = cursorParam != null;
try {
while (true) {
const [rows] = await source.query<RowDataPacket[]>(
`SELECT * FROM \`${this.escapeBacktick(table)}\` ORDER BY \`id\` LIMIT ? OFFSET ?`,
[size, offset],
);
let sql: string;
let params: unknown[];
if (useCursor && cursorParam != null) {
sql = `SELECT * FROM \`${this.escapeBacktick(table)}\` WHERE \`id\` > ? ORDER BY \`id\` LIMIT ?`;
params = [cursorParam, size];
} else {
sql = `SELECT * FROM \`${this.escapeBacktick(table)}\` ORDER BY \`id\` LIMIT ? OFFSET ?`;
params = [size, offset];
}
const [rows] = await source.query<RowDataPacket[]>(sql, params);
if (!rows || rows.length === 0) break;
for (const row of rows) {
yield row as Record<string, unknown>;
const cursorInfo = this.encodeCursorValue((row as Record<string, unknown>).id);
yield {
cursor: cursorInfo.value,
cursorEncoding: cursorInfo.encoding,
data: row as Record<string, unknown>,
};
}
offset += rows.length;
if (rows.length < size) break;
const last = rows[rows.length - 1] as Record<string, unknown>;
const lastCursor = this.encodeCursorValue(last.id);
const decoded = this.decodeCursorValue(lastCursor.value, lastCursor.encoding);
if (decoded != null) {
cursorParam = decoded;
useCursor = true;
} else if (useCursor) {
cursorParam = null;
useCursor = false;
}
}
} finally {
try {
@@ -108,8 +150,8 @@ class Engine {
private async paste(
table: string,
rows: AsyncGenerator<Record<string, unknown>>,
startOffset = 0,
rows: AsyncGenerator<SourceRow>,
startState: SyncProgress,
): Promise<number> {
const target = await this.target.connect();
let total = 0;
@@ -117,16 +159,19 @@ class Engine {
let columns: string[] | null = null;
let insertSql = "";
let upsertSql = "";
let lastCursor = startState.cursor ?? null;
let lastCursorEncoding = startState.cursorEncoding ?? null;
const ignored = this.ignoredColumnsFor(table);
const ignoredSet = new Set(ignored);
try {
for await (let row of rows) {
for await (const entry of rows) {
const row = entry.data;
const transformed = this.transformRowForTarget(table, row);
if (!columns) {
row = this.transformRowForTarget(table, row);
// Filter ignored columns and build column order
columns = Object.keys(row).filter((c) => !ignoredSet.has(c));
columns = Object.keys(transformed).filter((c) => !ignoredSet.has(c));
// If article target has credibility but source not, include computed credibility
if (
@@ -153,8 +198,6 @@ class Engine {
await target.query("BEGIN");
}
// Row transform and params in column order
const transformed = this.transformRowForTarget(table, row);
const params = columns!.map((c) => this.valueForColumn(c, transformed));
try {
@@ -172,9 +215,21 @@ class Engine {
}
total++;
inBatch++;
if (entry.cursor != null) {
lastCursor = entry.cursor;
lastCursorEncoding = entry.cursorEncoding ?? null;
}
if (inBatch >= this.batchSize) {
await this.setProgressOffset(table, startOffset + total, target);
await this.setProgressState(
table,
{
cursor: lastCursor,
cursorEncoding: lastCursorEncoding,
offset: startState.offset + total,
},
target,
);
await target.query("COMMIT");
inBatch = 0;
await target.query("BEGIN");
@@ -183,7 +238,15 @@ class Engine {
}
if (inBatch > 0) {
await this.setProgressOffset(table, startOffset + total, target);
await this.setProgressState(
table,
{
cursor: lastCursor,
cursorEncoding: lastCursorEncoding,
offset: startState.offset + total,
},
target,
);
await target.query("COMMIT");
}
} catch (e) {
@@ -398,6 +461,30 @@ class Engine {
return new Date();
}
private encodeCursorValue(value: unknown): {
encoding: CursorEncoding | null;
value: string | null;
} {
if (value == null) return { encoding: null, value: null };
if (Buffer.isBuffer(value)) {
return { encoding: "buffer", value: value.toString("hex") };
}
if (typeof value === "number" || typeof value === "bigint") {
return { encoding: "number", value: String(value) };
}
return { encoding: "string", value: String(value) };
}
private decodeCursorValue(value: string | null, encoding: CursorEncoding | null): unknown {
if (!value || !encoding) return null;
if (encoding === "buffer") return Buffer.from(value, "hex");
if (encoding === "number") {
const parsed = Number(value);
return Number.isNaN(parsed) ? null : parsed;
}
return value;
}
private quote(id: string) {
const norm = this.normalizedName(id);
return `"${norm.replaceAll('"', '""')}"`;
@@ -414,38 +501,53 @@ class Engine {
`CREATE TABLE IF NOT EXISTS "__sync_state" (
table_name text PRIMARY KEY,
last_offset integer NOT NULL DEFAULT 0,
last_cursor text,
cursor_encoding text,
updated_at timestamp NOT NULL DEFAULT NOW()
)`,
);
await client.query(`ALTER TABLE "__sync_state" ADD COLUMN IF NOT EXISTS last_cursor text`);
await client.query(
`ALTER TABLE "__sync_state" ADD COLUMN IF NOT EXISTS cursor_encoding text`,
);
} finally {
client.release();
}
}
private async getProgressOffset(table: string): Promise<number> {
private async getProgressState(table: string): Promise<SyncProgress> {
const client = await this.target.connect();
try {
const { rows } = await client.query<{ last_offset: number }>(
`SELECT last_offset FROM "__sync_state" WHERE table_name = $1`,
const { rows } = await client.query<{
cursor_encoding: CursorEncoding | null;
last_cursor: string | null;
last_offset: number;
}>(
`SELECT last_offset, last_cursor, cursor_encoding FROM "__sync_state" WHERE table_name = $1`,
[this.normalizedName(table)],
);
return rows[0]?.last_offset ?? 0;
const row = rows[0];
return {
cursor: row?.last_cursor ?? null,
cursorEncoding: row?.cursor_encoding ?? null,
offset: row?.last_offset ?? 0,
};
} finally {
client.release();
}
}
private async setProgressOffset(
private async setProgressState(
table: string,
offset: number,
state: SyncProgress,
client?: PoolClient,
): Promise<void> {
const run = async (c: PoolClient) => {
await c.query(
`INSERT INTO "__sync_state" (table_name, last_offset, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (table_name) DO UPDATE SET last_offset = EXCLUDED.last_offset, updated_at = NOW()`,
[this.normalizedName(table), offset],
`INSERT INTO "__sync_state" (table_name, last_offset, last_cursor, cursor_encoding, updated_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (table_name) DO UPDATE SET last_offset = EXCLUDED.last_offset, last_cursor = EXCLUDED.last_cursor, cursor_encoding = EXCLUDED.cursor_encoding, updated_at = NOW()`,
[this.normalizedName(table), state.offset, state.cursor, state.cursorEncoding],
);
};
if (client) return run(client);