모듈형 RAG(Retrieval-Augmented Generation) 시스템을 구축할 때, MCP 연동과 Neon의 pgvector, LangGraph 구조를 활용한 데이터 파이프라인 구성 방법을 알아보겠습니다.
기존 Neon DB에 pgvector extension을 추가하는 것이 훨씬 효율적입니다! 그 이유와 구체적인 구현 방법을 단계별로 설명드리겠습니다.
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL, -- 상품명
description TEXT, -- 임베딩 원문용 (옵션)
price INTEGER NOT NULL CHECK (price >= 0), -- 가격 (원 단위)
category VARCHAR(100), -- 카테고리 (추후 필터링용)
brand VARCHAR(100), -- 브랜드 (선택)
is_active BOOLEAN DEFAULT TRUE, -- 판매 여부 (실무에서 거의 필수)
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- 같은 DB 내에서 비즈니스 데이터와 벡터를 JOIN
SELECT
p.id,
p.name,
p.price,
v.similarity
FROM products p
JOIN product_vectors v ON p.id = v.product_id
WHERE v.embedding <=> $1 < 0.3 -- 벡터 유사도
AND p.price BETWEEN 100000 AND 200000 -- 비즈니스 로직
ORDER BY v.similarity DESC;
# 하나의 DB 연결로 모든 작업 가능
async with pool.acquire() as conn:
# 1. 비즈니스 로직
await conn.execute("UPDATE products SET stock = stock - 1 WHERE id = $1", product_id)
# 2. 벡터 검색
similar = await conn.fetch("SELECT * FROM products WHERE embedding <=> $1 < 0.3", query_vec)
-- Neon SQL Editor에서 실행
CREATE EXTENSION IF NOT EXISTS vector;
-- 설치 확인
SELECT * FROM pg_extension WHERE extname = 'vector';
실제 프로젝트를 기준으로 벡터 테이블 설계 예시를 보겠습니다:
-- 1. 제품 벡터 테이블 (기존 products 테이블과 연동)
CREATE TABLE product_embeddings (
id SERIAL PRIMARY KEY,
product_id INTEGER REFERENCES products(id) ON DELETE CASCADE,
embedding vector(1536), -- OpenAI text-embedding-3-small
embedding_model VARCHAR(50) DEFAULT 'text-embedding-3-small',
content_hash VARCHAR(64), -- 내용 변경 감지용
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
UNIQUE(product_id)
);
-- 2. 문서/FAQ 벡터 테이블
CREATE TABLE document_embeddings (
id SERIAL PRIMARY KEY,
doc_type VARCHAR(50), -- 'faq', 'manual', 'policy' 등
doc_id VARCHAR(100),
title TEXT,
content TEXT,
embedding vector(1536),
metadata JSONB, -- 도메인별 추가 정보
created_at TIMESTAMP DEFAULT NOW()
);
-- 3. 사용자 쿼리 히스토리 (학습용)
CREATE TABLE query_history (
id SERIAL PRIMARY KEY,
user_id INTEGER,
query TEXT,
query_embedding vector(1536),
response TEXT,
feedback INTEGER, -- 1: 좋음, -1: 나쁨
domain VARCHAR(50), -- 'consumer', 'admin', 'manufacturer', 'partner'
created_at TIMESTAMP DEFAULT NOW()
);
-- 인덱스 생성 (HNSW가 IVFFlat보다 빠름)
CREATE INDEX product_emb_idx ON product_embeddings
USING hnsw (embedding vector_cosine_ops);
CREATE INDEX doc_emb_idx ON document_embeddings
USING hnsw (embedding vector_cosine_ops);
CREATE INDEX query_emb_idx ON query_history
USING hnsw (query_embedding vector_cosine_ops);
기존 데이터를 벡터로 변환하는 마이그레이션 스크립트입니다:
# migration_script.py
import asyncio
import asyncpg
from openai import AsyncOpenAI
from typing import List, Dict
class VectorMigration:
def __init__(self, database_url: str, openai_api_key: str):
self.database_url = database_url
self.client = AsyncOpenAI(api_key=openai_api_key)
async def migrate_products(self):
"""기존 products 테이블 데이터를 벡터화"""
conn = await asyncpg.connect(self.database_url)
try:
# 1. 기존 제품 데이터 조회
products = await conn.fetch("""
SELECT id, name, description, category
FROM products
WHERE id NOT IN (SELECT product_id FROM product_embeddings)
""")
print(f"벡터화할 제품 수: {len(products)}")
# 2. 배치로 처리 (OpenAI API 제한 고려)
batch_size = 100
for i in range(0, len(products), batch_size):
batch = products[i:i+batch_size]
await self._process_product_batch(conn, batch)
print(f"진행률: {min(i+batch_size, len(products))}/{len(products)}")
finally:
await conn.close()
async def _process_product_batch(
self,
conn: asyncpg.Connection,
products: List[asyncpg.Record]
):
"""제품 배치를 벡터화하여 저장"""
# 1. 텍스트 준비
texts = [
f"{p['name']}\n{p['description']}\n카테고리: {p['category']}"
for p in products
]
# 2. 임베딩 생성 (배치)
response = await self.client.embeddings.create(
model="text-embedding-3-small",
input=texts
)
# 3. DB에 저장
for product, embedding_obj in zip(products, response.data):
await conn.execute("""
INSERT INTO product_embeddings (product_id, embedding)
VALUES ($1, $2)
ON CONFLICT (product_id) DO UPDATE
SET embedding = $2, updated_at = NOW()
""", product['id'], embedding_obj.embedding)
# 실행
async def main():
migration = VectorMigration(
database_url="postgresql://...",
openai_api_key="sk-..."
)
await migration.migrate_products()
if __name__ == "__main__":
asyncio.run(main())
벡터 검색과 비즈니스 로직을 통합한 Repository 패턴 구현:
# app/core/repositories/product_repository.py
from typing import List, Optional, Dict, Any
import asyncpg
class ProductRepository:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def search_with_filters(
self,
query_embedding: List[float],
filters: Dict[str, Any],
limit: int = 10
) -> List[Dict[str, Any]]:
"""벡터 검색 + 비즈니스 필터 통합"""
# 동적 WHERE 절 생성
where_clauses = []
params = [query_embedding]
param_idx = 2
if filters.get('category'):
where_clauses.append(f"p.category = ${param_idx}")
params.append(filters['category'])
param_idx += 1
if filters.get('min_price'):
where_clauses.append(f"p.price >= ${param_idx}")
params.append(filters['min_price'])
param_idx += 1
if filters.get('max_price'):
where_clauses.append(f"p.price <= ${param_idx}")
params.append(filters['max_price'])
param_idx += 1
where_sql = "AND " + " AND ".join(where_clauses) if where_clauses else ""
# 통합 쿼리
query = f"""
SELECT
p.id,
p.name,
p.description,
p.price,
p.stock,
p.category,
1 - (pe.embedding <=> $1) as similarity
FROM products p
JOIN product_embeddings pe ON p.id = pe.product_id
WHERE 1=1 {where_sql}
ORDER BY pe.embedding <=> $1
LIMIT {limit}
"""
async with self.pool.acquire() as conn:
results = await conn.fetch(query, *params)
return [dict(r) for r in results]
async def get_with_similar(
self,
product_id: int,
limit: int = 5
) -> Dict[str, Any]:
"""제품 상세 + 유사 제품"""
async with self.pool.acquire() as conn:
# 1. 제품 상세
product = await conn.fetchrow("""
SELECT p.*, pe.embedding
FROM products p
LEFT JOIN product_embeddings pe ON p.id = pe.product_id
WHERE p.id = $1
""", product_id)
if not product or not product['embedding']:
return None
# 2. 유사 제품
similar = await conn.fetch("""
SELECT
p.id,
p.name,
p.price,
1 - (pe.embedding <=> $1) as similarity
FROM products p
JOIN product_embeddings pe ON p.id = pe.product_id
WHERE p.id != $2
ORDER BY pe.embedding <=> $1
LIMIT $3
""", product['embedding'], product_id, limit)
return {
'product': dict(product),
'similar_products': [dict(s) for s in similar]
}
사용자별 맞춤형 RAG 서비스 구현:
# app/domain/consumer/services/rag_service.py
class ConsumerRAGService:
def __init__(
self,
pool: asyncpg.Pool,
openai_client: AsyncOpenAI
):
self.pool = pool
self.client = openai_client
self.product_repo = ProductRepository(pool)
async def semantic_product_search(
self,
query: str,
category: Optional[str] = None,
price_range: Optional[tuple] = None
) -> Dict[str, Any]:
"""소비자용 의미론적 제품 검색"""
# 1. 쿼리 벡터화
query_emb = await self._embed(query)
# 2. 필터 구성
filters = {}
if category:
filters['category'] = category
if price_range:
filters['min_price'], filters['max_price'] = price_range
# 3. 통합 검색
products = await self.product_repo.search_with_filters(
query_embedding=query_emb,
filters=filters,
limit=10
)
# 4. LLM으로 결과 요약
summary = await self._summarize_results(query, products)
return {
'query': query,
'products': products,
'summary': summary,
'count': len(products)
}
async def _embed(self, text: str) -> List[float]:
response = await self.client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
데이터 변경 시 자동으로 벡터화를 수행하는 트리거 시스템:
-- 제품이 추가/수정될 때 자동으로 벡터화 대기열에 추가
CREATE TABLE embedding_queue (
id SERIAL PRIMARY KEY,
table_name VARCHAR(50),
record_id INTEGER,
status VARCHAR(20) DEFAULT 'pending', -- pending, processing, completed, failed
created_at TIMESTAMP DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION queue_embedding_update()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO embedding_queue (table_name, record_id)
VALUES (TG_TABLE_NAME, NEW.id)
ON CONFLICT DO NOTHING;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- 트리거 생성
CREATE TRIGGER product_embedding_trigger
AFTER INSERT OR UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION queue_embedding_update();
벡터화 대기열을 처리하는 백그라운드 워커:
# worker.py - 대기열 처리
import asyncio
class EmbeddingWorker:
async def process_queue(self):
"""주기적으로 대기열 처리"""
while True:
async with self.pool.acquire() as conn:
# 1. 대기 중인 작업 가져오기
jobs = await conn.fetch("""
UPDATE embedding_queue
SET status = 'processing'
WHERE id IN (
SELECT id FROM embedding_queue
WHERE status = 'pending'
LIMIT 10
)
RETURNING *
""")
# 2. 처리
for job in jobs:
await self._process_job(conn, job)
await asyncio.sleep(5) # 5초마다 확인
┌─────────────────────────────────────────────────┐
│ Neon PostgreSQL (기존 DB) │
├─────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌────────────────────────┐ │
│ │ 기존 테이블 │ │ pgvector 테이블 │ │
│ │ │ │ │ │
│ │ products │──│ product_embeddings │ │
│ │ users │ │ document_embeddings │ │
│ │ orders │ │ query_history │ │
│ └──────────────┘ └────────────────────────┘ │
│ │ │
│ HNSW Index │
└─────────────────────────────────────────────────┘
│
┌───────┴────────┐
│ │
┌───────▼──────┐ ┌──────▼────────┐
│ FastAPI │ │ LangGraph │
│ Endpoints │ │ Pipeline │
└──────────────┘ └───────────────┘
기존 Neon DB에 pgvector extension을 추가하는 것이 데이터 일관성, 관리 편의성, 비용 측면에서 모두 유리합니다.
별도 벡터 DB는 다음과 같은 경우에만 고려하세요:
대부분의 프로젝트에서는 Neon + pgvector 조합이 최적의 선택입니다!
이 글이 도움이 되셨다면 댓글로 피드백을 남겨주세요. 더 자세한 구현 예제나 특정 부분에 대한 질문도 환영합니다!