feat(db): resumable updates

This commit is contained in:
2025-11-12 17:02:01 +02:00
parent a3f46b6b38
commit b5a24c886a
+86 -9
View File
@@ -18,6 +18,7 @@ type TargetOptions = {
batchSize?: number;
pageSize?: number;
ignoreColumns?: Record<string, string[]>;
resume?: boolean;
};
const DEFAULT_IGNORE: Record<string, string[]> = {
@@ -30,6 +31,7 @@ class Engine {
private readonly ignore: Record<string, string[]>;
private readonly pageSize: number;
private readonly batchSize: number;
private readonly resume: boolean;
constructor(
private readonly sourceOptions: SourceOptions,
@@ -43,8 +45,9 @@ class Engine {
this.ignore = { ...DEFAULT_IGNORE, ...(this.targetOptions.ignoreColumns ?? {}) };
this.pageSize = this.targetOptions.pageSize ?? 1000;
this.batchSize = Math.max(1, this.targetOptions.batchSize ?? 50);
this.resume = !!this.targetOptions.resume;
console.log(
`Engine initialized with pageSize=${this.pageSize} and batchSize=${this.batchSize}`,
`Engine initialized with pageSize=${this.pageSize} and batchSize=${this.batchSize} (resume=${this.resume})`,
);
}
@@ -53,11 +56,21 @@ class Engine {
}
async import(table: string): Promise<number> {
await this.reset(table);
return await this.paste(table, this.copy(table));
await this.ensureProgressTable();
let startOffset = 0;
if (this.resume) {
startOffset = await this.getProgressOffset(table);
console.log(`Resuming import for ${table} from offset=${startOffset}`);
} else {
await this.reset(table);
await this.setProgressOffset(table, 0);
}
return await this.paste(table, this.copy(table, startOffset), startOffset);
}
private async *copy(table: string): AsyncGenerator<Record<string, unknown>> {
private async *copy(table: string, startOffset = 0): AsyncGenerator<Record<string, unknown>> {
const mysql = await import("mysql2/promise");
const source = mysql.createPool({
@@ -70,12 +83,12 @@ class Engine {
user: this.sourceOptions.user,
});
let offset = 0;
let offset = startOffset;
const size = this.pageSize;
try {
while (true) {
const [rows] = await source.query<RowDataPacket[]>(
`SELECT * FROM \`${this.escapeBacktick(table)}\` LIMIT ? OFFSET ?`,
`SELECT * FROM \`${this.escapeBacktick(table)}\` ORDER BY \`id\` LIMIT ? OFFSET ?`,
[size, offset],
);
@@ -98,12 +111,14 @@ class Engine {
private async paste(
table: string,
rows: AsyncGenerator<Record<string, unknown>>,
startOffset = 0,
): Promise<number> {
const target = await this.target.connect();
let total = 0;
let inBatch = 0;
let columns: string[] | null = null;
let insertSql = "";
let upsertSql = "";
const ignored = this.ignoredColumnsFor(table);
const ignoredSet = new Set(ignored);
@@ -131,6 +146,12 @@ class Engine {
const placeholders = columns.map((_, i) => `$${i + 1}`).join(", ");
insertSql = `INSERT INTO ${this.quote(table)} (${colsSql}) VALUES (${placeholders})`;
const updateCols = columns.filter((c) => c !== "id");
const setSql = updateCols
.map((c) => `${this.quote(c)} = EXCLUDED.${this.quote(c)}`)
.join(", ");
upsertSql = `${insertSql} ON CONFLICT ("id") DO UPDATE SET ${setSql}`;
await target.query("BEGIN");
}
@@ -139,14 +160,14 @@ class Engine {
const params = columns!.map((c) => this.valueForColumn(c, transformed));
try {
await target.query(insertSql, params);
await target.query(upsertSql, params);
} catch (err: unknown) {
// Fallback: coerce all *_at params to now() and retry once
// This will never happen in production but anyway let's keep it safe
const msg = String((err as Error)?.message ?? "");
if (msg.includes("invalid input syntax for type timestamp")) {
const fixed = columns!.map((c, i) => (c.endsWith("_at") ? new Date() : params[i]));
await target.query(insertSql, fixed);
await target.query(upsertSql, fixed);
} else {
throw err;
}
@@ -155,6 +176,7 @@ class Engine {
inBatch++;
if (inBatch >= this.batchSize) {
await this.setProgressOffset(table, startOffset + total, target);
await target.query("COMMIT");
inBatch = 0;
await target.query("BEGIN");
@@ -163,6 +185,7 @@ class Engine {
}
if (inBatch > 0) {
await this.setProgressOffset(table, startOffset + total, target);
await target.query("COMMIT");
}
} catch (e) {
@@ -385,6 +408,56 @@ class Engine {
private escapeBacktick(id: string) {
return id.replaceAll("`", "``");
}
private async ensureProgressTable() {
const client = await this.target.connect();
try {
await client.query(
`CREATE TABLE IF NOT EXISTS "__sync_state" (
table_name text PRIMARY KEY,
last_offset integer NOT NULL DEFAULT 0,
updated_at timestamp NOT NULL DEFAULT NOW()
)`,
);
} finally {
client.release();
}
}
private async getProgressOffset(table: string): Promise<number> {
const client = await this.target.connect();
try {
const { rows } = await client.query<{ last_offset: number }>(
`SELECT last_offset FROM "__sync_state" WHERE table_name = $1`,
[this.normalizedName(table)],
);
return rows[0]?.last_offset ?? 0;
} finally {
client.release();
}
}
private async setProgressOffset(
table: string,
offset: number,
client?: PoolClient,
): Promise<void> {
const run = async (c: PoolClient) => {
await c.query(
`INSERT INTO "__sync_state" (table_name, last_offset, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (table_name) DO UPDATE SET last_offset = EXCLUDED.last_offset, updated_at = NOW()`,
[this.normalizedName(table), offset],
);
};
if (client) return run(client);
const c = await this.target.connect();
try {
await run(c);
} finally {
c.release();
}
}
}
function bufferToUuid(buf: Buffer): string {
@@ -410,6 +483,10 @@ async function safeRollback(client: PoolClient) {
}
async function main() {
const argv = process.argv.slice(2);
const resume = argv.includes("--resume") || argv.includes("-r");
const tables = argv.filter((a) => !a.startsWith("-"));
const engine = new Engine(
{
database: env("BASANGO_SOURCE_DATABASE_NAME"),
@@ -419,11 +496,11 @@ async function main() {
},
{
database: env("BASANGO_DATABASE_URL"),
resume,
},
);
try {
const tables = process.argv.slice(2);
if (tables.length === 0) tables.push("user", "source", "article");
for (const t of tables) {
const count = await engine.import(t);