feat: articles clusters

This commit is contained in:
2025-12-03 15:54:38 +02:00
parent 1d062f679b
commit 78c27b8220
20 changed files with 2113 additions and 51 deletions
@@ -0,0 +1,20 @@
CREATE TABLE "category" (
"candidates" text[] NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL,
"description" varchar(512),
"embeddings" jsonb,
"id" uuid PRIMARY KEY NOT NULL,
"name" varchar(255) NOT NULL,
"slug" varchar(255) NOT NULL,
"updated_at" timestamp,
"weight" integer DEFAULT 0 NOT NULL
);
--> statement-breakpoint
ALTER TABLE "article" ADD COLUMN "category_id" uuid;--> statement-breakpoint
ALTER TABLE "article" ADD COLUMN "clustered" boolean DEFAULT false NOT NULL;--> statement-breakpoint
CREATE UNIQUE INDEX "unq_category_name" ON "category" USING btree (lower((name)::text));--> statement-breakpoint
CREATE UNIQUE INDEX "unq_category_slug" ON "category" USING btree (lower((slug)::text));--> statement-breakpoint
CREATE INDEX "idx_category_weight" ON "category" USING btree ("weight");--> statement-breakpoint
ALTER TABLE "article" ADD CONSTRAINT "fk_article_category_id" FOREIGN KEY ("category_id") REFERENCES "public"."category"("id") ON DELETE set null ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "idx_article_category_id" ON "article" USING btree ("category_id");--> statement-breakpoint
CREATE INDEX "idx_article_clustered" ON "article" USING btree ("clustered");
File diff suppressed because it is too large Load Diff
@@ -21,6 +21,13 @@
"tag": "0002_modern_joseph",
"version": "7",
"when": 1763920009482
},
{
"breakpoints": true,
"idx": 3,
"tag": "0003_categories",
"version": "7",
"when": 1764767993880
}
],
"version": "7"
+4
View File
@@ -1,9 +1,12 @@
{
"dependencies": {
"@ai-sdk/google": "^2.0.44",
"@ai-sdk/openai": "^2.0.75",
"@basango/domain": "workspace:*",
"@basango/encryption": "workspace:*",
"@basango/logger": "workspace:*",
"@date-fns/utc": "^2.1.1",
"ai": "^5.0.105",
"date-fns": "catalog:",
"drizzle-orm": "^0.44.7",
"mysql2": "^3.15.3",
@@ -31,6 +34,7 @@
"private": true,
"scripts": {
"clean": "rm -rf .turbo node_modules",
"sync:categories": "bun ./src/synchronizers/categories.ts",
"sync:data": "bun ./src/synchronizers/data.ts",
"sync:tokens": "bun ./src/synchronizers/tokens.ts",
"typecheck": "tsc --noEmit"
+18 -5
View File
@@ -11,12 +11,12 @@ import {
} from "@basango/domain/models";
import { md5 } from "@basango/encryption";
import type { SQL } from "drizzle-orm";
import { count, desc, eq, getTableColumns, sql } from "drizzle-orm";
import { count, desc, eq, getTableColumns, or, sql } from "drizzle-orm";
import * as uuid from "uuid";
import { Database } from "#db/client";
import { getSourceIdByName } from "#db/queries/sources";
import { articles, sources } from "#db/schema";
import { articles, categories, sources } from "#db/schema";
import { CreateArticleParams, GetArticlesParams } from "#db/types/articles";
import { GetDistributionsParams, GetPublicationsParams } from "#db/types/shared";
import {
@@ -41,15 +41,17 @@ export async function createArticle(db: Database, params: CreateArticleParams) {
};
}
const categoryList = params.categories ?? [];
const data = {
...params,
categories: categoryList,
hash: md5(params.link),
readingTime: computeReadingTime(params.body),
sentiment: "neutral" as Sentiment,
sentiment: (params.sentiment ?? "neutral") as Sentiment,
sourceId: await getSourceIdByName(db, params.sourceId),
tokenStatistics: computeTokenStatistics({
body: params.body,
categories: params.categories,
categories: categoryList,
title: params.title,
}),
};
@@ -103,7 +105,14 @@ function buildFilters(params: GetArticlesParams, pagination: PaginationState) {
}
if (params.category) {
filters.push(sql`${params.category} = ANY(${articles.categories})`);
const categoryFilter = or(
eq(categories.slug, params.category),
eq(articles.categoryId, params.category),
);
if (categoryFilter) {
filters.push(categoryFilter);
}
}
if (params.search?.trim()) {
@@ -133,11 +142,15 @@ export async function getArticles(db: Database, params: GetArticlesParams) {
const query = db
.select({
...getTableColumns(articles),
category: {
...getTableColumns(categories),
},
source: {
...getTableColumns(sources),
},
})
.from(articles)
.leftJoin(categories, eq(articles.categoryId, categories.id))
.innerJoin(sources, eq(articles.sourceId, sources.id));
const rows = await applyFilters(query, filters)
+13 -12
View File
@@ -5,7 +5,7 @@ import * as uuid from "uuid";
import { Database } from "#db/client";
import { NotFoundError } from "#db/errors";
import { articles, sources } from "#db/schema";
import { articles, categories, sources } from "#db/schema";
import {
CategoryShare,
CategoryShares,
@@ -144,17 +144,18 @@ export async function getSourceCategoryShares(
): Promise<CategoryShares> {
const data = await db.execute<CategoryShare>(sql`
SELECT
cat AS category,
COUNT(*)::int AS count,
ROUND((COUNT(*)::numeric / SUM(COUNT(*)) OVER ()) * 100, 2) AS percentage
FROM (
SELECT NULLIF(BTRIM(c), '') AS cat
FROM ${articles}
CROSS JOIN LATERAL UNNEST(COALESCE(${articles.categories}, ARRAY[]::text[])) AS c
WHERE ${articles.sourceId} = ${params.id}
) t
WHERE cat IS NOT NULL
GROUP BY cat
${categories.id}::text AS "categoryId",
${categories.slug} AS slug,
${categories.name} AS category,
COUNT(${articles.id})::int AS count,
COALESCE(
ROUND((COUNT(*)::numeric / NULLIF(SUM(COUNT(*)) OVER (), 0)) * 100, 2),
0
)::float AS percentage
FROM ${articles}
JOIN ${categories} ON ${categories.id} = ${articles.categoryId}
WHERE ${articles.sourceId} = ${params.id} AND ${articles.clustered} = true
GROUP BY ${categories.id}, ${categories.slug}, ${categories.name}
ORDER BY count DESC
LIMIT ${params.limit ?? DEFAULT_CATEGORY_SHARES_LIMIT}
`);
+37
View File
@@ -94,11 +94,33 @@ export const sources = pgTable(
],
);
export const categories = pgTable(
"category",
{
candidates: text().array().notNull(),
createdAt: timestamp("created_at").defaultNow().notNull(),
description: varchar({ length: 512 }),
embeddings: jsonb("embeddings").$type<number[]>(),
id: uuid().primaryKey().notNull(),
name: varchar({ length: 255 }).notNull(),
slug: varchar({ length: 255 }).notNull(),
updatedAt: timestamp("updated_at"),
weight: integer().default(0).notNull(),
},
(table) => [
uniqueIndex("unq_category_name").using("btree", sql`lower((name)::text)`),
uniqueIndex("unq_category_slug").using("btree", sql`lower((slug)::text)`),
index("idx_category_weight").using("btree", table.weight.asc().nullsLast()),
],
);
export const articles = pgTable(
"article",
{
body: text().notNull(),
categories: text().array(),
categoryId: uuid("category_id"),
clustered: boolean("clustered").default(false).notNull(),
crawledAt: timestamp("crawled_at").defaultNow().notNull(),
credibility: jsonb("credibility").$type<Credibility>(),
excerpt: varchar({ length: 255 }).generatedAlwaysAs(sql`("left"(body, 200) || '...'::text)`),
@@ -123,6 +145,8 @@ export const articles = pgTable(
"gin",
table.categories.asc().nullsLast().op("array_ops"),
),
index("idx_article_category_id").using("btree", table.categoryId.asc().nullsLast()),
index("idx_article_clustered").using("btree", table.clustered.asc().nullsLast()),
index("gin_article_link_trgm").using("gin", table.link.asc().nullsLast().op("gin_trgm_ops")),
index("gin_article_title_trgm").using("gin", table.title.asc().nullsLast().op("gin_trgm_ops")),
index("gin_article_tsv").using("gin", table.tsv.asc().nullsLast().op("tsvector_ops")),
@@ -133,6 +157,11 @@ export const articles = pgTable(
table.id.desc().nullsFirst(),
),
uniqueIndex("unq_article_hash").using("btree", table.hash.asc().nullsLast()),
foreignKey({
columns: [table.categoryId],
foreignColumns: [categories.id],
name: "fk_article_category_id",
}).onDelete("set null"),
foreignKey({
columns: [table.sourceId],
foreignColumns: [sources.id],
@@ -425,6 +454,10 @@ export const commentRelations = relations(comments, ({ one }) => ({
export const articleRelations = relations(articles, ({ one, many }) => ({
bookmarkArticles: many(bookmarkArticles),
category: one(categories, {
fields: [articles.categoryId],
references: [categories.id],
}),
comments: many(comments),
source: one(sources, {
fields: [articles.sourceId],
@@ -432,6 +465,10 @@ export const articleRelations = relations(articles, ({ one, many }) => ({
}),
}));
export const categoryRelations = relations(categories, ({ many }) => ({
articles: many(articles),
}));
export const bookmarkArticleRelations = relations(bookmarkArticles, ({ one }) => ({
article: one(articles, {
fields: [bookmarkArticles.articleId],
@@ -0,0 +1,218 @@
import { logger } from "@basango/logger";
import { desc, eq, inArray, sql } from "drizzle-orm";
import { Database } from "#db/client";
import { articles, categories } from "#db/schema";
import { DEFAULT_CATEGORY } from "#domain/constants";
import { Categories } from "#domain/models";
type CategoryRow = typeof categories.$inferSelect;
type ArticleCategories = Pick<typeof articles.$inferSelect, "categories" | "id">;
type CategoryScore = {
category: (typeof Categories)[number];
matches: number;
score: number;
};
const BATCH_SIZE = 50_000;
const CATEGORY_MAP = new Map(Categories.map((category, index) => [category.slug, index]));
const CANDIDATE_MAP = buildCandidateMap();
const FALLBACK_CATEGORY = Categories.find((category) => category.slug === DEFAULT_CATEGORY)!;
export class CategoryClassifier {
constructor(private readonly db: Database) {}
async classifyPendingArticles(limit: number = BATCH_SIZE) {
const canonical = await this.ensureCanonicalCategories();
if (canonical.size === 0) {
logger.warn("No canonical categories available for clustering");
return { matched: 0, processed: 0, unmatched: 0 };
}
const pending = await this.db
.select({
categories: articles.categories,
id: articles.id,
})
.from(articles)
.where(eq(articles.clustered, false))
.orderBy(desc(articles.publishedAt), desc(articles.id))
.limit(limit);
if (pending.length === 0) {
logger.info("No articles to cluster");
return { matched: 0, processed: 0, unmatched: 0 };
}
let matched = 0;
let unmatched = 0;
const fallbackRow = canonical.get(FALLBACK_CATEGORY.slug);
for (const article of pending) {
const best = classifyCategory(article);
const targetRow = canonical.get(best.category.slug) ?? fallbackRow;
await this.db
.update(articles)
.set({
categoryId: targetRow?.id ?? null,
clustered: true,
updatedAt: sql`now()`,
})
.where(eq(articles.id, article.id));
if (targetRow) {
matched++;
logger.debug(
{
articleId: article.id,
category: best.category.slug,
matches: best.matches,
score: best.score,
},
"Clustered article",
);
} else {
unmatched++;
logger.debug({ articleId: article.id }, "No category match found");
}
}
const processed = pending.length;
logger.info({ matched, processed, unmatched }, "Category clustering run completed");
return { matched, processed, unmatched };
}
private async ensureCanonicalCategories(): Promise<Map<string, CategoryRow>> {
const payload = Categories.map(
(category) =>
({
candidates: category.candidates,
description: category.description ?? null,
embeddings: null,
id: category.id,
name: category.name,
slug: category.slug,
weight: category.weight,
}) satisfies typeof categories.$inferInsert,
);
await this.db.insert(categories).values(payload).onConflictDoNothing();
const existing = await this.db.query.categories.findMany({
where: inArray(
categories.slug,
Categories.map((category) => category.slug),
),
});
const map = new Map<string, CategoryRow>();
for (const row of existing) {
map.set(row.slug, row);
}
if (!map.has(FALLBACK_CATEGORY.slug)) {
logger.warn("Fallback main category is missing from canonical categories");
}
return map;
}
}
function classifyCategory(article: ArticleCategories): CategoryScore {
const rawCategories = article.categories ?? [];
const normalizedCategories = Array.from(
new Set(
rawCategories
.map((value) => normalizeCategory(value))
.filter((value): value is string => Boolean(value)),
),
);
const scores = new Map<string, CategoryScore>();
for (const normalized of normalizedCategories) {
const categories = CANDIDATE_MAP.get(normalized);
if (!categories) continue;
for (const category of categories) {
const current =
scores.get(category.slug) ??
({
category,
matches: 0,
score: 0,
} satisfies CategoryScore);
current.matches += 1;
current.score += category.weight;
scores.set(category.slug, current);
}
}
if (scores.size === 0) {
return { category: FALLBACK_CATEGORY, matches: 0, score: 0 };
}
const [first, ...rest] = Array.from(scores.values());
const best = rest.reduce<CategoryScore>((winner, candidate) => {
if (candidate.score !== winner.score) {
return candidate.score > winner.score ? candidate : winner;
}
if (candidate.category.weight !== winner.category.weight) {
return candidate.category.weight > winner.category.weight ? candidate : winner;
}
if (candidate.matches !== winner.matches) {
return candidate.matches > winner.matches ? candidate : winner;
}
const winnerOrder = CATEGORY_MAP.get(winner.category.slug) ?? Number.MAX_SAFE_INTEGER;
const candidateOrder = CATEGORY_MAP.get(candidate.category.slug) ?? Number.MAX_SAFE_INTEGER;
return candidateOrder < winnerOrder ? candidate : winner;
}, first ?? { category: FALLBACK_CATEGORY, matches: 0, score: 0 });
return best;
}
function buildCandidateMap(): Map<string, (typeof Categories)[number][]> {
const map = new Map<string, (typeof Categories)[number][]>();
for (const category of Categories) {
for (const candidate of category.candidates) {
const normalized = normalizeCategory(candidate);
if (!normalized) continue;
const existing = map.get(normalized) ?? [];
if (!existing.some((item) => item.slug === category.slug)) {
existing.push(category);
}
map.set(normalized, existing);
}
}
return map;
}
export function normalizeCategory(value?: string | null): string | null {
const trimmed = value?.trim();
if (!trimmed) return null;
const normalized = trimmed
.normalize("NFD")
.replace(/\p{Diacritic}/gu, "")
.toLowerCase()
.replace(/[^a-z0-9]+/g, " ")
.trim()
.replace(/\s+/g, " ");
return normalized.length > 0 ? normalized : null;
}
@@ -0,0 +1,18 @@
#!/usr/bin/env bun
import { logger } from "@basango/logger";
import { connectDb } from "#db/client";
import { CategoryClassifier } from "#db/services/category-classifier.js";
async function main() {
const db = await connectDb();
const service = new CategoryClassifier(db);
await service.classifyPendingArticles();
}
main().catch((error) => {
logger.error({ error }, "Category clustering failed");
process.exit(1);
});
+3 -1
View File
@@ -1,9 +1,11 @@
import { ArticleMetadata, ID, Sentiment, TokenStatistics } from "@basango/domain/models";
export type CreateArticleParams = {
categoryId?: string | null;
clustered?: boolean;
title: string;
body: string;
categories: string[];
categories?: string[];
link: string;
sourceId: string;
publishedAt: Date;
+2
View File
@@ -1,9 +1,11 @@
import { DateRange, ID } from "@basango/domain/models";
export type CategoryShare = {
categoryId: string;
category: string;
count: number;
percentage: number;
slug: string;
};
export type CategoryShares = {
+11 -12
View File
@@ -28,21 +28,20 @@ export const computeTokenCount = (
export const computeTokenStatistics = (data: {
title: string;
body: string;
categories: string[];
categories?: string[];
}): TokenStatistics => {
const [title, body, categories, excerpt] = [
computeTokenCount(data.title),
computeTokenCount(data.body),
computeTokenCount(data.categories.join(",")),
computeTokenCount(data.body.substring(0, 200)),
];
const normalizedCategories = data.categories ?? [];
const titleTokens = computeTokenCount(data.title);
const bodyTokens = computeTokenCount(data.body);
const categoryTokens = computeTokenCount(normalizedCategories.join(","));
const excerptTokens = computeTokenCount(data.body.substring(0, 200));
return {
body,
categories,
excerpt,
title,
total: title + body + categories + excerpt,
body: bodyTokens,
categories: categoryTokens,
excerpt: excerptTokens,
title: titleTokens,
total: titleTokens + bodyTokens + categoryTokens + excerptTokens,
};
};