Skip to Content
БлогНормализация 100 млн аудиотреков: Многоэтапный пайплайн с LLM

Нормализация 100 млн аудиотреков: Многоэтапный пайплайн с LLM

Архитектура нормализации аудиотреков

Задача: нормализовать 100 миллионов записей аудиотреков, где пользователи самостоятельно добавляли названия и исполнителей. Нужно выявить канонические версии треков и связать все варианты (включая опечатки, перефразирования и ошибки) с каноническими записями.

Проблема

  • 100 млн записей с названиями и исполнителями
  • Пользовательский ввод → опечатки, разные форматы, дубликаты
  • Нужно: разделить оригиналы и пользовательские варианты, исправить ошибки

Архитектура решения: Многоэтапный пайплайн

Решение использует каскадный подход с тремя слоями обработки: от дешевых и быстрых методов к дорогим, но точным.

Применяемые паттерны проектирования

  1. Multi-stage Pipeline — многоэтапная обработка данных
  2. Retrieval Augmented Generation (RAG) — поиск похожих треков через векторные embedding’и
  3. Incremental Model Querying — пошаговые запросы к LLM для сложных случаев
  4. Prompt/Response Optimiser — стандартизация формата ответов LLM

Этап 0: Подготовка и анализ

// types.ts export interface TrackRecord { id: number; original_title: string; original_artist: string; created_at?: Date; } export interface CanonicalTrack { canonical_id: number; canonical_title: string; canonical_artist: string; normalized_key: string; embedding?: number[]; } export interface TrackMapping { original_record_id: number; canonical_id: number; match_type: 'exact_hash' | 'embedding_auto' | 'llm_verified' | 'new_canonical'; match_confidence: number; verification_reason?: string; }
// 0-analysis.ts import { TrackRecord } from './types'; export async function analyzeTracks(tracks: TrackRecord[]) { const stats = { total: tracks.length, uniqueTitles: new Set<string>(), uniqueArtists: new Set<string>(), caseVariations: 0, specialChars: 0, }; for (const track of tracks) { stats.uniqueTitles.add(track.original_title.toLowerCase()); stats.uniqueArtists.add(track.original_artist.toLowerCase()); // Проверка на вариации регистра const normalized = track.original_title.toLowerCase().trim(); if (track.original_title !== normalized) { stats.caseVariations++; } // Проверка на спецсимволы if (/[()\[\]{}!@#$%^&*]/.test(track.original_title)) { stats.specialChars++; } } console.log('Анализ данных:'); console.log(`Всего записей: ${stats.total}`); console.log(`Уникальных названий: ${stats.uniqueTitles.size}`); console.log(`Уникальных исполнителей: ${stats.uniqueArtists.size}`); console.log(`Вариаций регистра: ${stats.caseVariations}`); console.log(`Записей со спецсимволами: ${stats.specialChars}`); return stats; }

Этап 1: Текстовая нормализация и кластеризация

Паттерн: Multi-stage Pipeline (дешевый слой)

// 1-text-normalization.ts import crypto from 'crypto'; import { TrackRecord, CanonicalTrack } from './types'; export function normalizeText(text: string): string { return text .toLowerCase() .normalize('NFD') // Разбить на базовые символы и диакритики .replace(/[\u0300-\u036f]/g, '') // Удалить диакритики .replace(/[()\[\]{}!@#$%^&*"']/g, '') // Удалить спецсимволы .replace(/\s+/g, ' ') // Нормализовать пробелы .trim(); } export function createNormalizedKey(title: string, artist: string): string { const normalizedTitle = normalizeText(title); const normalizedArtist = normalizeText(artist); // Удалить стоп-слова const stopWords = new Set(['the', 'a', 'an', 'official', 'video', 'remix', 'feat', 'ft']); const titleWords = normalizedTitle .split(' ') .filter(word => !stopWords.has(word)) .join(' '); return `${titleWords} ${normalizedArtist}`; } export function hashNormalizedKey(key: string): string { return crypto.createHash('sha256').update(key).digest('hex'); } export interface TrackCluster { hash: string; tracks: TrackRecord[]; canonicalCandidate: TrackRecord; } export function clusterByHash(tracks: TrackRecord[]): Map<string, TrackCluster> { const clusters = new Map<string, TrackCluster>(); for (const track of tracks) { const normalizedKey = createNormalizedKey(track.original_title, track.original_artist); const hash = hashNormalizedKey(normalizedKey); if (!clusters.has(hash)) { clusters.set(hash, { hash, tracks: [], canonicalCandidate: track, // Временно, будет пересчитано }); } clusters.get(hash)!.tracks.push(track); } // Выбрать канонического кандидата (самый частый или самый ранний) for (const cluster of clusters.values()) { // Сортируем по частоте (размер группы) и дате создания cluster.canonicalCandidate = cluster.tracks .sort((a, b) => { // Приоритет: больше треков в группе → раньше создан return cluster.tracks.length - cluster.tracks.length || (a.created_at?.getTime() || 0) - (b.created_at?.getTime() || 0); })[0]; } return clusters; } export function extractCanonicalCandidates(clusters: Map<string, TrackCluster>): CanonicalTrack[] { const canonicals: CanonicalTrack[] = []; let id = 1; for (const cluster of clusters.values()) { const candidate = cluster.canonicalCandidate; canonicals.push({ canonical_id: id++, canonical_title: candidate.original_title, canonical_artist: candidate.original_artist, normalized_key: createNormalizedKey( candidate.original_title, candidate.original_artist ), }); } return canonicals; }

Этап 2: Векторный поиск по embedding’ам

Паттерн: Retrieval Augmented Generation (RAG)

// 2-vector-search.ts import { ChromaClient } from 'chromadb'; import OpenAI from 'openai'; import { TrackRecord, CanonicalTrack, TrackMapping } from './types'; export class EmbeddingService { private openai: OpenAI; private chroma: ChromaClient; private collectionName = 'canonical_tracks'; constructor(openaiApiKey: string, chromaUrl: string = 'http://localhost:8000') { this.openai = new OpenAI({ apiKey: openaiApiKey }); this.chroma = new ChromaClient({ path: chromaUrl }); } async createEmbedding(text: string): Promise<number[]> { const response = await this.openai.embeddings.create({ model: 'text-embedding-3-small', // или text-embedding-ada-002 input: text, }); return response.data[0].embedding; } async initializeCollection() { try { await this.chroma.deleteCollection({ name: this.collectionName }); } catch (e) { // Коллекция не существует } await this.chroma.createCollection({ name: this.collectionName, }); } async indexCanonicalTracks(canonicals: CanonicalTrack[]) { const collection = await this.chroma.getCollection({ name: this.collectionName }); // Батчинг для эффективности const batchSize = 100; for (let i = 0; i < canonicals.length; i += batchSize) { const batch = canonicals.slice(i, i + batchSize); const texts = batch.map(c => `${c.canonical_title} - ${c.canonical_artist}`); const embeddings = await Promise.all( texts.map(text => this.createEmbedding(text)) ); await collection.add({ ids: batch.map(c => c.canonical_id.toString()), embeddings: embeddings, metadatas: batch.map(c => ({ title: c.canonical_title, artist: c.canonical_artist, normalized_key: c.normalized_key, })), }); console.log(`Проиндексировано ${Math.min(i + batchSize, canonicals.length)}/${canonicals.length}`); } } async findSimilarTracks( track: TrackRecord, topK: number = 5 ): Promise<Array<{ canonical: CanonicalTrack; similarity: number }>> { const collection = await this.chroma.getCollection({ name: this.collectionName }); const queryText = `${track.original_title} - ${track.original_artist}`; const queryEmbedding = await this.createEmbedding(queryText); const results = await collection.query({ queryEmbeddings: [queryEmbedding], nResults: topK, }); const matches: Array<{ canonical: CanonicalTrack; similarity: number }> = []; if (results.ids && results.ids[0]) { const ids = results.ids[0]; const distances = results.distances?.[0] || []; for (let i = 0; i < ids.length; i++) { const id = parseInt(ids[i]); const distance = distances[i]; const similarity = 1 - distance; // Косинусное расстояние → сходство const metadata = results.metadatas?.[0]?.[i]; if (metadata) { matches.push({ canonical: { canonical_id: id, canonical_title: metadata.title as string, canonical_artist: metadata.artist as string, normalized_key: metadata.normalized_key as string, }, similarity, }); } } } return matches; } async matchTracks( tracks: TrackRecord[], autoMatchThreshold: number = 0.95 ): Promise<{ autoMatched: TrackMapping[]; needsLLMVerification: Array<{ track: TrackRecord; candidates: CanonicalTrack[] }>; }> { const autoMatched: TrackMapping[] = []; const needsLLMVerification: Array<{ track: TrackRecord; candidates: CanonicalTrack[] }> = []; for (const track of tracks) { const similar = await this.findSimilarTracks(track, 5); if (similar.length > 0 && similar[0].similarity >= autoMatchThreshold) { // Автоматическое сопоставление autoMatched.push({ original_record_id: track.id, canonical_id: similar[0].canonical.canonical_id, match_type: 'embedding_auto', match_confidence: similar[0].similarity, }); } else if (similar.length > 0 && similar[0].similarity >= 0.7) { // Нужна верификация через LLM needsLLMVerification.push({ track, candidates: similar.map(s => s.canonical), }); } } return { autoMatched, needsLLMVerification }; } }

Этап 3: Верификация через LLM

Паттерны: Incremental Model Querying, Prompt/Response Optimiser

// 3-llm-verification.ts import OpenAI from 'openai'; import { TrackRecord, CanonicalTrack, TrackMapping } from './types'; interface LLMVerificationRequest { track: TrackRecord; candidates: CanonicalTrack[]; } interface LLMVerificationResponse { canonical_id: number | null; // null = новый канонический трек confidence: number; reason: string; } export class LLMVerificationService { private openai: OpenAI; constructor(openaiApiKey: string) { this.openai = new OpenAI({ apiKey: openaiApiKey }); } private createPrompt(request: LLMVerificationRequest): string { const candidatesList = request.candidates .map((c, i) => `${i + 1}. "${c.canonical_title}" - ${c.canonical_artist} (ID: ${c.canonical_id})`) .join('\n'); return `Ты эксперт по нормализации музыкальных данных. Определи, к какому каноническому треку относится данная запись. Исходная запись: - Название: "${request.track.original_title}" - Исполнитель: "${request.track.original_artist}" Кандидаты в канонические треки: ${candidatesList} Задача: Определи, является ли исходная запись вариантом одного из кандидатов (опечатка, перефразирование, альтернативное название) или это новый уникальный трек. Верни ответ в формате JSON: { "canonical_id": <номер ID кандидата или null, если это новый трек>, "confidence": <число от 0 до 1, уверенность в решении>, "reason": "<краткое объяснение решения>" } Если исходная запись точно соответствует одному из кандидатов (с учетом опечаток), верни его ID. Если это явно другой трек, верни null для canonical_id.`; } async verifyBatch( requests: LLMVerificationRequest[], batchSize: number = 10 ): Promise<TrackMapping[]> { const mappings: TrackMapping[] = []; for (let i = 0; i < requests.length; i += batchSize) { const batch = requests.slice(i, i + batchSize); const promises = batch.map(async (request) => { try { const prompt = this.createPrompt(request); const response = await this.openai.chat.completions.create({ model: 'gpt-4-turbo-preview', messages: [ { role: 'system', content: 'Ты помощник для нормализации музыкальных данных. Всегда отвечай валидным JSON.', }, { role: 'user', content: prompt, }, ], response_format: { type: 'json_object' }, temperature: 0.3, // Низкая температура для консистентности }); const content = response.choices[0].message.content; if (!content) { throw new Error('Пустой ответ от LLM'); } const result: LLMVerificationResponse = JSON.parse(content); return { original_record_id: request.track.id, canonical_id: result.canonical_id || -1, // -1 означает новый канонический трек match_type: result.canonical_id ? 'llm_verified' : 'new_canonical', match_confidence: result.confidence, verification_reason: result.reason, } as TrackMapping; } catch (error) { console.error(`Ошибка при верификации трека ${request.track.id}:`, error); // Fallback: создаем новый канонический трек return { original_record_id: request.track.id, canonical_id: -1, match_type: 'new_canonical', match_confidence: 0.5, verification_reason: 'Ошибка при верификации LLM', } as TrackMapping; } }); const batchResults = await Promise.all(promises); mappings.push(...batchResults); console.log(`Обработано ${Math.min(i + batchSize, requests.length)}/${requests.length} запросов`); // Небольшая задержка для соблюдения rate limits if (i + batchSize < requests.length) { await new Promise(resolve => setTimeout(resolve, 1000)); } } return mappings; } }

Этап 4: Финальная сборка и сохранение

// 4-finalize.ts import { Pool } from 'pg'; import { TrackMapping, CanonicalTrack } from './types'; export class DatabaseService { private pool: Pool; constructor(connectionString: string) { this.pool = new Pool({ connectionString }); } async initializeSchema() { await this.pool.query(` CREATE TABLE IF NOT EXISTS canonical_tracks ( canonical_id BIGSERIAL PRIMARY KEY, canonical_title TEXT NOT NULL, canonical_artist TEXT NOT NULL, normalized_key TEXT UNIQUE, created_at TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS track_mappings ( original_record_id BIGINT PRIMARY KEY, canonical_id BIGINT REFERENCES canonical_tracks(canonical_id), match_type TEXT NOT NULL, match_confidence FLOAT, verification_reason TEXT, created_at TIMESTAMP DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_track_mappings_canonical ON track_mappings(canonical_id); `); } async saveCanonicalTracks(canonicals: CanonicalTrack[]) { const client = await this.pool.connect(); try { await client.query('BEGIN'); for (const canonical of canonicals) { await client.query( `INSERT INTO canonical_tracks (canonical_id, canonical_title, canonical_artist, normalized_key) VALUES ($1, $2, $3, $4) ON CONFLICT (normalized_key) DO NOTHING`, [ canonical.canonical_id, canonical.canonical_title, canonical.canonical_artist, canonical.normalized_key, ] ); } await client.query('COMMIT'); } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } } async saveTrackMappings(mappings: TrackMapping[]) { const client = await this.pool.connect(); try { await client.query('BEGIN'); // Для новых канонических треков (canonical_id = -1) создаем записи const newCanonicals = mappings.filter(m => m.canonical_id === -1); for (const mapping of newCanonicals) { // Здесь нужно получить оригинальный трек и создать канонический // Упрощенная версия } const batchSize = 1000; for (let i = 0; i < mappings.length; i += batchSize) { const batch = mappings.slice(i, i + batchSize); const values = batch.map((m, idx) => { const base = idx * 5; return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5})`; }).join(', '); const params: any[] = []; batch.forEach(m => { params.push( m.original_record_id, m.canonical_id, m.match_type, m.match_confidence, m.verification_reason ); }); await client.query( `INSERT INTO track_mappings (original_record_id, canonical_id, match_type, match_confidence, verification_reason) VALUES ${values} ON CONFLICT (original_record_id) DO UPDATE SET canonical_id = EXCLUDED.canonical_id, match_type = EXCLUDED.match_type, match_confidence = EXCLUDED.match_confidence`, params ); } await client.query('COMMIT'); } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } } }

Главный пайплайн

// main.ts import { analyzeTracks } from './0-analysis'; import { clusterByHash, extractCanonicalCandidates } from './1-text-normalization'; import { EmbeddingService } from './2-vector-search'; import { LLMVerificationService } from './3-llm-verification'; import { DatabaseService } from './4-finalize'; import { TrackRecord } from './types'; export async function normalizeTracksPipeline( tracks: TrackRecord[], config: { openaiApiKey: string; chromaUrl: string; dbConnectionString: string; autoMatchThreshold?: number; } ) { console.log('=== Этап 0: Анализ данных ==='); await analyzeTracks(tracks); console.log('\n=== Этап 1: Текстовая нормализация ==='); const clusters = clusterByHash(tracks); console.log(`Создано ${clusters.size} кластеров`); const canonicalCandidates = extractCanonicalCandidates(clusters); console.log(`Извлечено ${canonicalCandidates.length} канонических кандидатов`); console.log('\n=== Этап 2: Векторный поиск ==='); const embeddingService = new EmbeddingService(config.openaiApiKey, config.chromaUrl); await embeddingService.initializeCollection(); await embeddingService.indexCanonicalTracks(canonicalCandidates); const { autoMatched, needsLLMVerification } = await embeddingService.matchTracks( tracks, config.autoMatchThreshold || 0.95 ); console.log(`Автоматически сопоставлено: ${autoMatched.length}`); console.log(`Требуют верификации LLM: ${needsLLMVerification.length}`); console.log('\n=== Этап 3: Верификация через LLM ==='); const llmService = new LLMVerificationService(config.openaiApiKey); const llmMappings = await llmService.verifyBatch(needsLLMVerification); console.log('\n=== Этап 4: Сохранение результатов ==='); const dbService = new DatabaseService(config.dbConnectionString); await dbService.initializeSchema(); await dbService.saveCanonicalTracks(canonicalCandidates); await dbService.saveTrackMappings([...autoMatched, ...llmMappings]); console.log('\n=== Готово! ==='); return { totalTracks: tracks.length, canonicalTracks: canonicalCandidates.length, autoMatched: autoMatched.length, llmVerified: llmMappings.length, }; } // Использование async function main() { // Загрузка данных из БД const tracks: TrackRecord[] = []; // Загрузить из вашей БД const results = await normalizeTracksPipeline(tracks, { openaiApiKey: process.env.OPENAI_API_KEY!, chromaUrl: process.env.CHROMA_URL || 'http://localhost:8000', dbConnectionString: process.env.DATABASE_URL!, autoMatchThreshold: 0.95, }); console.log('Результаты:', results); } if (require.main === module) { main().catch(console.error); }

Применяемые паттерны проектирования

1. Multi-stage Pipeline (Многоэтапный пайплайн)

Каскадная обработка: от дешевых методов (текстовая нормализация) к дорогим (LLM). Каждый этап фильтрует данные, уменьшая нагрузку на следующий.

2. Retrieval Augmented Generation (RAG)

Векторный поиск похожих треков через embedding’и. Канонические треки индексируются в векторной БД, затем для каждой записи ищутся ближайшие соседи.

3. Incremental Model Querying

Пошаговые запросы к LLM только для сложных случаев (когда embedding-сходство недостаточно). Экономит токены и повышает точность.

4. Prompt/Response Optimiser

Стандартизация формата ответов LLM через response_format: { type: 'json_object' } и четкие инструкции в промпте.

Оптимизации и рекомендации

  1. Итеративность: Запустите на выборке 10-50k записей, оцените качество, отрегулируйте пороги
  2. Батчинг: Все API-вызовы (embedding, LLM) выполняются батчами для эффективности
  3. Кэширование: Embedding’и для канонических треков вычисляются один раз
  4. Параллелизация: Этап 2 можно распараллелить на несколько воркеров
  5. Мониторинг: Отслеживайте стоимость API-вызовов и качество сопоставлений

Оценка стоимости

Для 100 млн записей:

  • Этап 1: Бесплатно (локальная обработка)
  • Этап 2: ~$2,000-8,000 (embedding для канонических кандидатов, ~5-20 млн уникальных записей после кластеризации)
  • Этап 3: ~$50,000-200,000 (LLM-верификация для 10-20% записей, ~10-20 млн запросов, с использованием GPT-3.5-turbo для большинства случаев)

Итого: ~$52,000-208,000 для полной нормализации 100 млн записей.

Оптимизации для снижения стоимости:

  • Использование более дешевых embedding моделей (text-embedding-3-small вместо ada-002)
  • Применение GPT-3.5-turbo для 80% случаев, GPT-4 только для сложных (confidence < 0.8)
  • Кэширование embedding’ов для повторяющихся запросов
  • Инкрементальная обработка: обрабатывать новые записи по мере поступления

Читай также

Last updated on