feat(db): add tokens sync

This commit is contained in:
2025-11-12 11:54:47 +02:00
parent 495e3ea996
commit b8b2a15ee9
5 changed files with 169 additions and 73 deletions
+442
View File
@@ -0,0 +1,442 @@
#!/usr/bin/env bun
import { RowDataPacket } from "mysql2/promise";
import { Pool, PoolClient } from "pg";
import { env } from "@/config";
import { computeReadingTime } from "@/utils/computed";
type SourceOptions = {
host: string;
user: string;
password: string;
database: string;
};
type TargetOptions = {
database: string;
batchSize?: number;
pageSize?: number;
ignoreColumns?: Record<string, string[]>;
};
const DEFAULT_IGNORE: Record<string, string[]> = {
article: ["tsv", "image", "excerpt", "bias", "reliability", "transparency"],
source: ["bias", "reliability", "transparency"],
};
class Engine {
private readonly target: Pool;
private readonly ignore: Record<string, string[]>;
private readonly pageSize: number;
private readonly batchSize: number;
constructor(
private readonly sourceOptions: SourceOptions,
private readonly targetOptions: TargetOptions,
) {
this.target = new Pool({
allowExitOnIdle: true,
connectionString: this.targetOptions.database,
max: 16,
});
this.ignore = { ...DEFAULT_IGNORE, ...(this.targetOptions.ignoreColumns ?? {}) };
this.pageSize = this.targetOptions.pageSize ?? 1000;
this.batchSize = Math.max(1, this.targetOptions.batchSize ?? 50);
console.log(
`Engine initialized with pageSize=${this.pageSize} and batchSize=${this.batchSize}`,
);
}
async close() {
await this.target.end();
}
async import(table: string): Promise<number> {
await this.reset(table);
return await this.paste(table, this.copy(table));
}
private async *copy(table: string): AsyncGenerator<Record<string, unknown>> {
const mysql = await import("mysql2/promise");
const source = mysql.createPool({
database: this.sourceOptions.database,
host: this.sourceOptions.host,
idleTimeout: 180_000_000,
password: this.sourceOptions.password,
port: 3306,
rowsAsArray: false,
user: this.sourceOptions.user,
});
let offset = 0;
const size = this.pageSize;
try {
while (true) {
const [rows] = await source.query<RowDataPacket[]>(
`SELECT * FROM \`${this.escapeBacktick(table)}\` LIMIT ? OFFSET ?`,
[size, offset],
);
if (!rows || rows.length === 0) break;
for (const row of rows) {
yield row as Record<string, unknown>;
}
offset += rows.length;
if (rows.length < size) break;
}
} finally {
try {
await source.end();
} catch {}
}
}
private async paste(
table: string,
rows: AsyncGenerator<Record<string, unknown>>,
): Promise<number> {
const target = await this.target.connect();
let total = 0;
let inBatch = 0;
let columns: string[] | null = null;
let insertSql = "";
const ignored = this.ignoredColumnsFor(table);
const ignoredSet = new Set(ignored);
try {
for await (let row of rows) {
if (!columns) {
row = this.transformRowForTarget(table, row);
// Filter ignored columns and build column order
columns = Object.keys(row).filter((c) => !ignoredSet.has(c));
// If article target has credibility but source not, include computed credibility
if (
(this.normalizedName(table) === "article" && !columns.includes("credibility")) ||
(this.normalizedName(table) === "source" && !columns.includes("credibility"))
) {
columns.push("credibility");
}
if (this.normalizedName(table) === "article" && !columns.includes("token_statistics")) {
columns.push("token_statistics");
}
const colsSql = columns.map((c) => this.quote(c)).join(", ");
const placeholders = columns.map((_, i) => `$${i + 1}`).join(", ");
insertSql = `INSERT INTO ${this.quote(table)} (${colsSql}) VALUES (${placeholders})`;
await target.query("BEGIN");
}
// Row transform and params in column order
const transformed = this.transformRowForTarget(table, row);
const params = columns!.map((c) => this.valueForColumn(c, transformed));
try {
await target.query(insertSql, params);
} catch (err: unknown) {
// Fallback: coerce all *_at params to now() and retry once
// This will never happen in production but anyway let's keep it safe
const msg = String((err as Error)?.message ?? "");
if (msg.includes("invalid input syntax for type timestamp")) {
const fixed = columns!.map((c, i) => (c.endsWith("_at") ? new Date() : params[i]));
await target.query(insertSql, fixed);
} else {
throw err;
}
}
total++;
inBatch++;
if (inBatch >= this.batchSize) {
await target.query("COMMIT");
inBatch = 0;
await target.query("BEGIN");
console.log(`Imported ${total} records into ${table} so far...`);
}
}
if (inBatch > 0) {
await target.query("COMMIT");
}
} catch (e) {
await safeRollback(target);
throw e;
} finally {
target.release();
}
return total;
}
private normalizedName(table: string): string {
return table.replaceAll('"', "").replaceAll("`", "").toLowerCase();
}
private ignoredColumnsFor(table: string): string[] {
return this.ignore[this.normalizedName(table)] ?? [];
}
private async reset(table: string) {
const client = await this.target.connect();
try {
await client.query("BEGIN");
await client.query("SET session_replication_role = 'replica'");
await client.query(`TRUNCATE TABLE ${this.quote(table)} RESTART IDENTITY CASCADE`);
await client.query("SET session_replication_role = 'origin'");
await client.query("COMMIT");
console.log(`Reset completed for table ${table}`);
} catch (e) {
await safeRollback(client);
throw e;
} finally {
client.release();
}
}
private transformRowForTarget(table: string, row: Record<string, unknown>) {
const t = this.normalizedName(table);
const clone: Record<string, unknown> = { ...row };
for (const [key, val] of Object.entries(clone)) {
if (val == null) continue;
if (key === "id" || key.endsWith("_id")) {
clone[key] = this.normalizeUuidValue(val);
continue;
}
if (key.endsWith("_at")) {
clone[key] = this.normalizeTimestampValue(val);
continue;
}
if (key === "categories") {
if (Array.isArray(val)) {
clone[key] = val;
} else if (typeof val === "string") {
const raw = val.trim();
// Try JSON first
if (raw.startsWith("[") && raw.endsWith("]")) {
try {
const parsed = JSON.parse(raw);
if (Array.isArray(parsed)) {
clone[key] = parsed;
continue;
}
} catch {}
}
const parts = raw
.split(",")
.map((s) => s.trim())
.filter(Boolean);
clone[key] = parts.length ? parts : null;
}
}
if (t === "article" && key === "reading_time") {
clone[key] = Math.max(1, computeReadingTime(String(clone.body ?? "")));
}
if (key === "roles") {
if (Array.isArray(val)) {
clone[key] = val;
} else if (typeof val === "string") {
const parts = val
.split(",")
.map((s) => s.trim())
.filter(Boolean);
clone[key] = parts.length ? parts : ["ROLE_USER"];
}
}
}
if (t === "article" || t === "source") {
const bias = clone.bias ?? null;
const reliability = clone.reliability ?? null;
const transparency = clone.transparency ?? null;
if (bias || reliability || transparency) {
clone.credibility = {
bias,
reliability,
transparency,
};
}
}
return clone;
}
private valueForColumn(col: string, row: Record<string, unknown>) {
const v = row[col];
// Pass Date objects directly to pg for timestamp columns
if (col.endsWith("_at") && v instanceof Date) {
return v;
}
if (col === "credibility" && v && typeof v === "object") {
return JSON.stringify(v);
}
if (col === "token_statistics" && v && typeof v === "object") {
return JSON.stringify(v);
}
if (col === "device" && v && typeof v === "object") {
return JSON.stringify(v);
}
if (col === "location" && v && typeof v === "object") {
return JSON.stringify(v);
}
if (col === "roles" && v) {
return v;
}
if (col === "metadata" && v && typeof v === "object") {
return JSON.stringify(v);
}
return v ?? null;
}
private normalizeUuidValue(value: unknown): string {
if (Buffer.isBuffer(value)) {
return bufferToUuid(value);
}
if (typeof value === "string") {
// Already a UUID string or hex; try to format 32-hex into canonical form
const hex = value.replace(/-/g, "").toLowerCase();
if (/^[0-9a-f]{32}$/.test(hex)) {
return (
hex.slice(0, 8) +
"-" +
hex.slice(8, 12) +
"-" +
hex.slice(12, 16) +
"-" +
hex.slice(16, 20) +
"-" +
hex.slice(20)
);
}
return value;
}
return String(value);
}
private normalizeTimestampValue(value: unknown): Date {
// If it's already a Date, ensure it's valid
if (value instanceof Date) {
return Number.isNaN(value.getTime()) ? new Date() : value;
}
// Strings: handle common invalid patterns and attempt safe parsing
if (typeof value === "string") {
const raw = value.trim();
if (
!raw ||
/0000-00-00/.test(raw) ||
/NaN/.test(raw) ||
raw.toLowerCase() === "invalid date"
) {
return new Date();
}
// Normalize MySQL-like 'YYYY-MM-DD HH:MM:SS[.ffffff]' to ISO
let s = raw.replace(" ", "T");
// Reduce microseconds to milliseconds (3 digits) if present
s = s.replace(/\.(\d{3})\d+$/, ".$1");
// Append Z if there is no timezone info
if (!/[zZ]|[+-]\d{2}:?\d{2}$/.test(s)) s += "Z";
const d = new Date(s);
if (!Number.isNaN(d.getTime())) return d;
// Try numeric string as epoch seconds/millis
const n = Number(raw);
if (Number.isFinite(n)) {
const ms = n > 1e12 ? n : n * 1000;
const d2 = new Date(ms);
if (!Number.isNaN(d2.getTime())) return d2;
}
return new Date();
}
// Numbers: treat as epoch seconds/millis
if (typeof value === "number" && Number.isFinite(value)) {
const ms = value > 1e12 ? value : value * 1000;
const d = new Date(ms);
return Number.isNaN(d.getTime()) ? new Date() : d;
}
// Fallback: now
return new Date();
}
private quote(id: string) {
const norm = this.normalizedName(id);
return `"${norm.replaceAll('"', '""')}"`;
}
private escapeBacktick(id: string) {
return id.replaceAll("`", "``");
}
}
function bufferToUuid(buf: Buffer): string {
if (buf.length !== 16) return buf.toString("hex");
const hex = buf.toString("hex");
return (
hex.slice(0, 8) +
"-" +
hex.slice(8, 12) +
"-" +
hex.slice(12, 16) +
"-" +
hex.slice(16, 20) +
"-" +
hex.slice(20)
);
}
async function safeRollback(client: PoolClient) {
try {
await client.query("ROLLBACK");
} catch {}
}
async function main() {
const engine = new Engine(
{
database: env("BASANGO_SOURCE_DATABASE_NAME"),
host: env("BASANGO_SOURCE_DATABASE_HOST"),
password: env("BASANGO_SOURCE_DATABASE_PASS"),
user: env("BASANGO_SOURCE_DATABASE_USER"),
},
{
database: env("BASANGO_DATABASE_URL"),
},
);
try {
const tables = process.argv.slice(2);
if (tables.length === 0) tables.push("user", "source", "article");
for (const t of tables) {
const count = await engine.import(t);
console.log(`Imported ${count} records into ${t} table.`);
}
console.log("Import completed successfully");
process.exit(0);
} finally {
await engine.close();
}
}
main().catch((err) => {
console.error(err?.message ?? err);
process.exit(1);
});
+131
View File
@@ -0,0 +1,131 @@
#!/usr/bin/env bun
import { Pool } from "pg";
import { env } from "@/config";
import { computeTokenStatistics } from "@/utils/computed";
type ArticleRow = {
id: string;
title: string;
body: string;
categories: string[];
};
class Engine {
private readonly db: Pool;
private readonly pageSize: number = 1000;
private readonly batchSize: number = 50;
constructor(private readonly database: string) {
this.db = new Pool({
allowExitOnIdle: true,
connectionString: this.database,
max: 16,
});
console.log(
`Engine initialized with pageSize=${this.pageSize} and batchSize=${this.batchSize}`,
);
}
async synchronize() {
const client = await this.db.connect();
console.log("Starting token statistics computation...");
try {
let cursor: string | null = null; // keyset pagination cursor on id (DESC)
for (;;) {
const params: Array<number | string> = [this.pageSize];
const sql: string = cursor
? `SELECT id, title, body, COALESCE(categories, ARRAY[]::text[]) AS categories
FROM article
WHERE token_statistics IS NULL AND id < $2
ORDER BY id DESC
LIMIT $1`
: `SELECT id, title, body, COALESCE(categories, ARRAY[]::text[]) AS categories
FROM article
WHERE token_statistics IS NULL
ORDER BY id DESC
LIMIT $1`;
if (cursor) params.push(cursor);
const { rows } = await client.query<ArticleRow>(sql, params);
if (rows.length === 0) break;
const ids: string[] = [];
const statistics: string[] = [];
for (const r of rows) {
ids.push(r.id);
statistics.push(
JSON.stringify(
computeTokenStatistics({
body: r.body,
categories: r.categories,
title: r.title,
}),
),
);
}
console.log(`Apply updates in transactional sub-batches...`);
for (let i = 0; i < ids.length; i += this.batchSize) {
const idsChunk = ids.slice(i, i + this.batchSize);
const statsChunk = statistics.slice(i, i + this.batchSize);
if (idsChunk.length === 0) continue;
try {
await client.query("BEGIN");
await client.query(
`UPDATE article AS a
SET token_statistics = u.token_statistics, updated_at = NOW()
FROM (
SELECT
UNNEST($1::uuid[]) AS id,
(UNNEST($2::text[]))::jsonb AS token_statistics
) AS u
WHERE a.id = u.id`,
[idsChunk, statsChunk],
);
await client.query("COMMIT");
console.log(`Updated batch: size=${idsChunk.length}`);
} catch (e) {
try {
await client.query("ROLLBACK");
} catch {}
throw e;
}
}
// advance cursor to the lowest id in this page (since we order DESC)
cursor = rows[rows.length - 1]!.id;
console.log(`Processed page: updated=${ids.length}`);
console.log(`Advancing cursor to id < ${cursor}`);
}
} finally {
client.release();
}
}
async close() {
await this.db.end();
}
}
async function main() {
const engine = new Engine(env("BASANGO_DATABASE_URL"));
try {
await engine.synchronize();
console.log("Token statistics computation completed successfully");
process.exit(0);
} finally {
await engine.close();
}
}
main().catch((err) => {
console.error(err?.message ?? err);
process.exit(1);
});