Нормализация 100 млн аудиотреков: Многоэтапный пайплайн с LLM

Задача: нормализовать 100 миллионов записей аудиотреков, где пользователи самостоятельно добавляли названия и исполнителей. Нужно выявить канонические версии треков и связать все варианты (включая опечатки, перефразирования и ошибки) с каноническими записями.
Проблема
- 100 млн записей с названиями и исполнителями
- Пользовательский ввод → опечатки, разные форматы, дубликаты
- Нужно: разделить оригиналы и пользовательские варианты, исправить ошибки
Архитектура решения: Многоэтапный пайплайн
Решение использует каскадный подход с тремя слоями обработки: от дешевых и быстрых методов к дорогим, но точным.
Применяемые паттерны проектирования
- Multi-stage Pipeline — многоэтапная обработка данных
- Retrieval Augmented Generation (RAG) — поиск похожих треков через векторные embedding’и
- Incremental Model Querying — пошаговые запросы к LLM для сложных случаев
- Prompt/Response Optimiser — стандартизация формата ответов LLM
Этап 0: Подготовка и анализ
// types.ts
export interface TrackRecord {
id: number;
original_title: string;
original_artist: string;
created_at?: Date;
}
export interface CanonicalTrack {
canonical_id: number;
canonical_title: string;
canonical_artist: string;
normalized_key: string;
embedding?: number[];
}
export interface TrackMapping {
original_record_id: number;
canonical_id: number;
match_type: 'exact_hash' | 'embedding_auto' | 'llm_verified' | 'new_canonical';
match_confidence: number;
verification_reason?: string;
}// 0-analysis.ts
import { TrackRecord } from './types';
export async function analyzeTracks(tracks: TrackRecord[]) {
const stats = {
total: tracks.length,
uniqueTitles: new Set<string>(),
uniqueArtists: new Set<string>(),
caseVariations: 0,
specialChars: 0,
};
for (const track of tracks) {
stats.uniqueTitles.add(track.original_title.toLowerCase());
stats.uniqueArtists.add(track.original_artist.toLowerCase());
// Проверка на вариации регистра
const normalized = track.original_title.toLowerCase().trim();
if (track.original_title !== normalized) {
stats.caseVariations++;
}
// Проверка на спецсимволы
if (/[()\[\]{}!@#$%^&*]/.test(track.original_title)) {
stats.specialChars++;
}
}
console.log('Анализ данных:');
console.log(`Всего записей: ${stats.total}`);
console.log(`Уникальных названий: ${stats.uniqueTitles.size}`);
console.log(`Уникальных исполнителей: ${stats.uniqueArtists.size}`);
console.log(`Вариаций регистра: ${stats.caseVariations}`);
console.log(`Записей со спецсимволами: ${stats.specialChars}`);
return stats;
}Этап 1: Текстовая нормализация и кластеризация
Паттерн: Multi-stage Pipeline (дешевый слой)
// 1-text-normalization.ts
import crypto from 'crypto';
import { TrackRecord, CanonicalTrack } from './types';
export function normalizeText(text: string): string {
return text
.toLowerCase()
.normalize('NFD') // Разбить на базовые символы и диакритики
.replace(/[\u0300-\u036f]/g, '') // Удалить диакритики
.replace(/[()\[\]{}!@#$%^&*"']/g, '') // Удалить спецсимволы
.replace(/\s+/g, ' ') // Нормализовать пробелы
.trim();
}
export function createNormalizedKey(title: string, artist: string): string {
const normalizedTitle = normalizeText(title);
const normalizedArtist = normalizeText(artist);
// Удалить стоп-слова
const stopWords = new Set(['the', 'a', 'an', 'official', 'video', 'remix', 'feat', 'ft']);
const titleWords = normalizedTitle
.split(' ')
.filter(word => !stopWords.has(word))
.join(' ');
return `${titleWords} ${normalizedArtist}`;
}
export function hashNormalizedKey(key: string): string {
return crypto.createHash('sha256').update(key).digest('hex');
}
export interface TrackCluster {
hash: string;
tracks: TrackRecord[];
canonicalCandidate: TrackRecord;
}
export function clusterByHash(tracks: TrackRecord[]): Map<string, TrackCluster> {
const clusters = new Map<string, TrackCluster>();
for (const track of tracks) {
const normalizedKey = createNormalizedKey(track.original_title, track.original_artist);
const hash = hashNormalizedKey(normalizedKey);
if (!clusters.has(hash)) {
clusters.set(hash, {
hash,
tracks: [],
canonicalCandidate: track, // Временно, будет пересчитано
});
}
clusters.get(hash)!.tracks.push(track);
}
// Выбрать канонического кандидата (самый частый или самый ранний)
for (const cluster of clusters.values()) {
// Сортируем по частоте (размер группы) и дате создания
cluster.canonicalCandidate = cluster.tracks
.sort((a, b) => {
// Приоритет: больше треков в группе → раньше создан
return cluster.tracks.length - cluster.tracks.length ||
(a.created_at?.getTime() || 0) - (b.created_at?.getTime() || 0);
})[0];
}
return clusters;
}
export function extractCanonicalCandidates(clusters: Map<string, TrackCluster>): CanonicalTrack[] {
const canonicals: CanonicalTrack[] = [];
let id = 1;
for (const cluster of clusters.values()) {
const candidate = cluster.canonicalCandidate;
canonicals.push({
canonical_id: id++,
canonical_title: candidate.original_title,
canonical_artist: candidate.original_artist,
normalized_key: createNormalizedKey(
candidate.original_title,
candidate.original_artist
),
});
}
return canonicals;
}Этап 2: Векторный поиск по embedding’ам
Паттерн: Retrieval Augmented Generation (RAG)
// 2-vector-search.ts
import { ChromaClient } from 'chromadb';
import OpenAI from 'openai';
import { TrackRecord, CanonicalTrack, TrackMapping } from './types';
export class EmbeddingService {
private openai: OpenAI;
private chroma: ChromaClient;
private collectionName = 'canonical_tracks';
constructor(openaiApiKey: string, chromaUrl: string = 'http://localhost:8000') {
this.openai = new OpenAI({ apiKey: openaiApiKey });
this.chroma = new ChromaClient({ path: chromaUrl });
}
async createEmbedding(text: string): Promise<number[]> {
const response = await this.openai.embeddings.create({
model: 'text-embedding-3-small', // или text-embedding-ada-002
input: text,
});
return response.data[0].embedding;
}
async initializeCollection() {
try {
await this.chroma.deleteCollection({ name: this.collectionName });
} catch (e) {
// Коллекция не существует
}
await this.chroma.createCollection({
name: this.collectionName,
});
}
async indexCanonicalTracks(canonicals: CanonicalTrack[]) {
const collection = await this.chroma.getCollection({ name: this.collectionName });
// Батчинг для эффективности
const batchSize = 100;
for (let i = 0; i < canonicals.length; i += batchSize) {
const batch = canonicals.slice(i, i + batchSize);
const texts = batch.map(c => `${c.canonical_title} - ${c.canonical_artist}`);
const embeddings = await Promise.all(
texts.map(text => this.createEmbedding(text))
);
await collection.add({
ids: batch.map(c => c.canonical_id.toString()),
embeddings: embeddings,
metadatas: batch.map(c => ({
title: c.canonical_title,
artist: c.canonical_artist,
normalized_key: c.normalized_key,
})),
});
console.log(`Проиндексировано ${Math.min(i + batchSize, canonicals.length)}/${canonicals.length}`);
}
}
async findSimilarTracks(
track: TrackRecord,
topK: number = 5
): Promise<Array<{ canonical: CanonicalTrack; similarity: number }>> {
const collection = await this.chroma.getCollection({ name: this.collectionName });
const queryText = `${track.original_title} - ${track.original_artist}`;
const queryEmbedding = await this.createEmbedding(queryText);
const results = await collection.query({
queryEmbeddings: [queryEmbedding],
nResults: topK,
});
const matches: Array<{ canonical: CanonicalTrack; similarity: number }> = [];
if (results.ids && results.ids[0]) {
const ids = results.ids[0];
const distances = results.distances?.[0] || [];
for (let i = 0; i < ids.length; i++) {
const id = parseInt(ids[i]);
const distance = distances[i];
const similarity = 1 - distance; // Косинусное расстояние → сходство
const metadata = results.metadatas?.[0]?.[i];
if (metadata) {
matches.push({
canonical: {
canonical_id: id,
canonical_title: metadata.title as string,
canonical_artist: metadata.artist as string,
normalized_key: metadata.normalized_key as string,
},
similarity,
});
}
}
}
return matches;
}
async matchTracks(
tracks: TrackRecord[],
autoMatchThreshold: number = 0.95
): Promise<{
autoMatched: TrackMapping[];
needsLLMVerification: Array<{ track: TrackRecord; candidates: CanonicalTrack[] }>;
}> {
const autoMatched: TrackMapping[] = [];
const needsLLMVerification: Array<{ track: TrackRecord; candidates: CanonicalTrack[] }> = [];
for (const track of tracks) {
const similar = await this.findSimilarTracks(track, 5);
if (similar.length > 0 && similar[0].similarity >= autoMatchThreshold) {
// Автоматическое сопоставление
autoMatched.push({
original_record_id: track.id,
canonical_id: similar[0].canonical.canonical_id,
match_type: 'embedding_auto',
match_confidence: similar[0].similarity,
});
} else if (similar.length > 0 && similar[0].similarity >= 0.7) {
// Нужна верификация через LLM
needsLLMVerification.push({
track,
candidates: similar.map(s => s.canonical),
});
}
}
return { autoMatched, needsLLMVerification };
}
}Этап 3: Верификация через LLM
Паттерны: Incremental Model Querying, Prompt/Response Optimiser
// 3-llm-verification.ts
import OpenAI from 'openai';
import { TrackRecord, CanonicalTrack, TrackMapping } from './types';
interface LLMVerificationRequest {
track: TrackRecord;
candidates: CanonicalTrack[];
}
interface LLMVerificationResponse {
canonical_id: number | null; // null = новый канонический трек
confidence: number;
reason: string;
}
export class LLMVerificationService {
private openai: OpenAI;
constructor(openaiApiKey: string) {
this.openai = new OpenAI({ apiKey: openaiApiKey });
}
private createPrompt(request: LLMVerificationRequest): string {
const candidatesList = request.candidates
.map((c, i) => `${i + 1}. "${c.canonical_title}" - ${c.canonical_artist} (ID: ${c.canonical_id})`)
.join('\n');
return `Ты эксперт по нормализации музыкальных данных. Определи, к какому каноническому треку относится данная запись.
Исходная запись:
- Название: "${request.track.original_title}"
- Исполнитель: "${request.track.original_artist}"
Кандидаты в канонические треки:
${candidatesList}
Задача: Определи, является ли исходная запись вариантом одного из кандидатов (опечатка, перефразирование, альтернативное название) или это новый уникальный трек.
Верни ответ в формате JSON:
{
"canonical_id": <номер ID кандидата или null, если это новый трек>,
"confidence": <число от 0 до 1, уверенность в решении>,
"reason": "<краткое объяснение решения>"
}
Если исходная запись точно соответствует одному из кандидатов (с учетом опечаток), верни его ID.
Если это явно другой трек, верни null для canonical_id.`;
}
async verifyBatch(
requests: LLMVerificationRequest[],
batchSize: number = 10
): Promise<TrackMapping[]> {
const mappings: TrackMapping[] = [];
for (let i = 0; i < requests.length; i += batchSize) {
const batch = requests.slice(i, i + batchSize);
const promises = batch.map(async (request) => {
try {
const prompt = this.createPrompt(request);
const response = await this.openai.chat.completions.create({
model: 'gpt-4-turbo-preview',
messages: [
{
role: 'system',
content: 'Ты помощник для нормализации музыкальных данных. Всегда отвечай валидным JSON.',
},
{
role: 'user',
content: prompt,
},
],
response_format: { type: 'json_object' },
temperature: 0.3, // Низкая температура для консистентности
});
const content = response.choices[0].message.content;
if (!content) {
throw new Error('Пустой ответ от LLM');
}
const result: LLMVerificationResponse = JSON.parse(content);
return {
original_record_id: request.track.id,
canonical_id: result.canonical_id || -1, // -1 означает новый канонический трек
match_type: result.canonical_id ? 'llm_verified' : 'new_canonical',
match_confidence: result.confidence,
verification_reason: result.reason,
} as TrackMapping;
} catch (error) {
console.error(`Ошибка при верификации трека ${request.track.id}:`, error);
// Fallback: создаем новый канонический трек
return {
original_record_id: request.track.id,
canonical_id: -1,
match_type: 'new_canonical',
match_confidence: 0.5,
verification_reason: 'Ошибка при верификации LLM',
} as TrackMapping;
}
});
const batchResults = await Promise.all(promises);
mappings.push(...batchResults);
console.log(`Обработано ${Math.min(i + batchSize, requests.length)}/${requests.length} запросов`);
// Небольшая задержка для соблюдения rate limits
if (i + batchSize < requests.length) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
return mappings;
}
}Этап 4: Финальная сборка и сохранение
// 4-finalize.ts
import { Pool } from 'pg';
import { TrackMapping, CanonicalTrack } from './types';
export class DatabaseService {
private pool: Pool;
constructor(connectionString: string) {
this.pool = new Pool({ connectionString });
}
async initializeSchema() {
await this.pool.query(`
CREATE TABLE IF NOT EXISTS canonical_tracks (
canonical_id BIGSERIAL PRIMARY KEY,
canonical_title TEXT NOT NULL,
canonical_artist TEXT NOT NULL,
normalized_key TEXT UNIQUE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS track_mappings (
original_record_id BIGINT PRIMARY KEY,
canonical_id BIGINT REFERENCES canonical_tracks(canonical_id),
match_type TEXT NOT NULL,
match_confidence FLOAT,
verification_reason TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_track_mappings_canonical
ON track_mappings(canonical_id);
`);
}
async saveCanonicalTracks(canonicals: CanonicalTrack[]) {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
for (const canonical of canonicals) {
await client.query(
`INSERT INTO canonical_tracks
(canonical_id, canonical_title, canonical_artist, normalized_key)
VALUES ($1, $2, $3, $4)
ON CONFLICT (normalized_key) DO NOTHING`,
[
canonical.canonical_id,
canonical.canonical_title,
canonical.canonical_artist,
canonical.normalized_key,
]
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async saveTrackMappings(mappings: TrackMapping[]) {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Для новых канонических треков (canonical_id = -1) создаем записи
const newCanonicals = mappings.filter(m => m.canonical_id === -1);
for (const mapping of newCanonicals) {
// Здесь нужно получить оригинальный трек и создать канонический
// Упрощенная версия
}
const batchSize = 1000;
for (let i = 0; i < mappings.length; i += batchSize) {
const batch = mappings.slice(i, i + batchSize);
const values = batch.map((m, idx) => {
const base = idx * 5;
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5})`;
}).join(', ');
const params: any[] = [];
batch.forEach(m => {
params.push(
m.original_record_id,
m.canonical_id,
m.match_type,
m.match_confidence,
m.verification_reason
);
});
await client.query(
`INSERT INTO track_mappings
(original_record_id, canonical_id, match_type, match_confidence, verification_reason)
VALUES ${values}
ON CONFLICT (original_record_id) DO UPDATE SET
canonical_id = EXCLUDED.canonical_id,
match_type = EXCLUDED.match_type,
match_confidence = EXCLUDED.match_confidence`,
params
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}Главный пайплайн
// main.ts
import { analyzeTracks } from './0-analysis';
import { clusterByHash, extractCanonicalCandidates } from './1-text-normalization';
import { EmbeddingService } from './2-vector-search';
import { LLMVerificationService } from './3-llm-verification';
import { DatabaseService } from './4-finalize';
import { TrackRecord } from './types';
export async function normalizeTracksPipeline(
tracks: TrackRecord[],
config: {
openaiApiKey: string;
chromaUrl: string;
dbConnectionString: string;
autoMatchThreshold?: number;
}
) {
console.log('=== Этап 0: Анализ данных ===');
await analyzeTracks(tracks);
console.log('\n=== Этап 1: Текстовая нормализация ===');
const clusters = clusterByHash(tracks);
console.log(`Создано ${clusters.size} кластеров`);
const canonicalCandidates = extractCanonicalCandidates(clusters);
console.log(`Извлечено ${canonicalCandidates.length} канонических кандидатов`);
console.log('\n=== Этап 2: Векторный поиск ===');
const embeddingService = new EmbeddingService(config.openaiApiKey, config.chromaUrl);
await embeddingService.initializeCollection();
await embeddingService.indexCanonicalTracks(canonicalCandidates);
const { autoMatched, needsLLMVerification } = await embeddingService.matchTracks(
tracks,
config.autoMatchThreshold || 0.95
);
console.log(`Автоматически сопоставлено: ${autoMatched.length}`);
console.log(`Требуют верификации LLM: ${needsLLMVerification.length}`);
console.log('\n=== Этап 3: Верификация через LLM ===');
const llmService = new LLMVerificationService(config.openaiApiKey);
const llmMappings = await llmService.verifyBatch(needsLLMVerification);
console.log('\n=== Этап 4: Сохранение результатов ===');
const dbService = new DatabaseService(config.dbConnectionString);
await dbService.initializeSchema();
await dbService.saveCanonicalTracks(canonicalCandidates);
await dbService.saveTrackMappings([...autoMatched, ...llmMappings]);
console.log('\n=== Готово! ===');
return {
totalTracks: tracks.length,
canonicalTracks: canonicalCandidates.length,
autoMatched: autoMatched.length,
llmVerified: llmMappings.length,
};
}
// Использование
async function main() {
// Загрузка данных из БД
const tracks: TrackRecord[] = []; // Загрузить из вашей БД
const results = await normalizeTracksPipeline(tracks, {
openaiApiKey: process.env.OPENAI_API_KEY!,
chromaUrl: process.env.CHROMA_URL || 'http://localhost:8000',
dbConnectionString: process.env.DATABASE_URL!,
autoMatchThreshold: 0.95,
});
console.log('Результаты:', results);
}
if (require.main === module) {
main().catch(console.error);
}Применяемые паттерны проектирования
1. Multi-stage Pipeline (Многоэтапный пайплайн)
Каскадная обработка: от дешевых методов (текстовая нормализация) к дорогим (LLM). Каждый этап фильтрует данные, уменьшая нагрузку на следующий.
2. Retrieval Augmented Generation (RAG)
Векторный поиск похожих треков через embedding’и. Канонические треки индексируются в векторной БД, затем для каждой записи ищутся ближайшие соседи.
3. Incremental Model Querying
Пошаговые запросы к LLM только для сложных случаев (когда embedding-сходство недостаточно). Экономит токены и повышает точность.
4. Prompt/Response Optimiser
Стандартизация формата ответов LLM через response_format: { type: 'json_object' } и четкие инструкции в промпте.
Оптимизации и рекомендации
- Итеративность: Запустите на выборке 10-50k записей, оцените качество, отрегулируйте пороги
- Батчинг: Все API-вызовы (embedding, LLM) выполняются батчами для эффективности
- Кэширование: Embedding’и для канонических треков вычисляются один раз
- Параллелизация: Этап 2 можно распараллелить на несколько воркеров
- Мониторинг: Отслеживайте стоимость API-вызовов и качество сопоставлений
Оценка стоимости
Для 100 млн записей:
- Этап 1: Бесплатно (локальная обработка)
- Этап 2: ~$2,000-8,000 (embedding для канонических кандидатов, ~5-20 млн уникальных записей после кластеризации)
- Этап 3: ~$50,000-200,000 (LLM-верификация для 10-20% записей, ~10-20 млн запросов, с использованием GPT-3.5-turbo для большинства случаев)
Итого: ~$52,000-208,000 для полной нормализации 100 млн записей.
Оптимизации для снижения стоимости:
- Использование более дешевых embedding моделей (text-embedding-3-small вместо ada-002)
- Применение GPT-3.5-turbo для 80% случаев, GPT-4 только для сложных (confidence < 0.8)
- Кэширование embedding’ов для повторяющихся запросов
- Инкрементальная обработка: обрабатывать новые записи по мере поступления
Читай также
- Паттерны проектирования ИИ-агентов - Каталог из 18 архитектурных паттернов
- Retrieval Augmented Generation - Паттерн расширения знаний агентов
- Incremental Model Querying - Пошаговые запросы к модели