모듈형 RAG(Retrieval-Augmented Generation) 시스템을 구축할 때, MCP 연동과 Neon의 pgvector, LangGraph 구조를 활용한 데이터 파이프라인 구성 방법을 알아보겠습니다.

기존 Neon DB에 pgvector extension을 추가하는 것이 훨씬 효율적입니다! 그 이유와 구체적인 구현 방법을 단계별로 설명드리겠습니다.

목차

  1. 기존 DB 확장이 나은 이유
    1. 0. 상품 테이블
    2. 1. 데이터 일관성 & 트랜잭션
    3. 2. 관리 포인트 단일화
    4. 3. 비용 효율
    5. 4. 개발 편의성
  2. 기존 DB 확장 실행 계획
    1. Step 1: pgvector Extension 추가
    2. Step 2: 기존 테이블 구조에 맞춘 벡터 테이블 설계
    3. Step 3: 기존 데이터 벡터화 마이그레이션
    4. Step 4: 통합 Repository 패턴
    5. Step 5: 도메인별 벡터 검색 서비스
    6. Step 6: 트리거로 자동 벡터화
    7. Step 7: 백그라운드 워커
  3. 권장 아키텍처
  4. 결론
    1. 핵심 장점 요약
    2. 언제 별도 DB를 고려할까?

기존 DB 확장이 나은 이유

0. 상품 테이블


CREATE TABLE products (
    id SERIAL PRIMARY KEY,

    name TEXT NOT NULL,              -- 상품명
    description TEXT,                -- 임베딩 원문용 (옵션)
    
    price INTEGER NOT NULL CHECK (price >= 0),  -- 가격 (원 단위)

    category VARCHAR(100),            -- 카테고리 (추후 필터링용)
    brand VARCHAR(100),               -- 브랜드 (선택)

    is_active BOOLEAN DEFAULT TRUE,   -- 판매 여부 (실무에서 거의 필수)

    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);


1. 데이터 일관성 & 트랜잭션

-- 같은 DB 내에서 비즈니스 데이터와 벡터를 JOIN
SELECT 
    p.id,
    p.name,
    p.price,
    v.similarity
FROM products p
JOIN product_vectors v ON p.id = v.product_id
WHERE v.embedding <=> $1 < 0.3  -- 벡터 유사도
AND p.price BETWEEN 100000 AND 200000  -- 비즈니스 로직
ORDER BY v.similarity DESC;

2. 관리 포인트 단일화

3. 비용 효율

4. 개발 편의성

# 하나의 DB 연결로 모든 작업 가능
async with pool.acquire() as conn:
    # 1. 비즈니스 로직
    await conn.execute("UPDATE products SET stock = stock - 1 WHERE id = $1", product_id)
    
    # 2. 벡터 검색
    similar = await conn.fetch("SELECT * FROM products WHERE embedding <=> $1 < 0.3", query_vec)

기존 DB 확장 실행 계획

Step 1: pgvector Extension 추가

-- Neon SQL Editor에서 실행
CREATE EXTENSION IF NOT EXISTS vector;

-- 설치 확인
SELECT * FROM pg_extension WHERE extname = 'vector';

Step 2: 기존 테이블 구조에 맞춘 벡터 테이블 설계

실제 프로젝트를 기준으로 벡터 테이블 설계 예시를 보겠습니다:

-- 1. 제품 벡터 테이블 (기존 products 테이블과 연동)
CREATE TABLE product_embeddings (
    id SERIAL PRIMARY KEY,
    product_id INTEGER REFERENCES products(id) ON DELETE CASCADE,
    embedding vector(1536),  -- OpenAI text-embedding-3-small
    embedding_model VARCHAR(50) DEFAULT 'text-embedding-3-small',
    content_hash VARCHAR(64),  -- 내용 변경 감지용
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW(),
    UNIQUE(product_id)
);

-- 2. 문서/FAQ 벡터 테이블
CREATE TABLE document_embeddings (
    id SERIAL PRIMARY KEY,
    doc_type VARCHAR(50),  -- 'faq', 'manual', 'policy' 등
    doc_id VARCHAR(100),
    title TEXT,
    content TEXT,
    embedding vector(1536),
    metadata JSONB,  -- 도메인별 추가 정보
    created_at TIMESTAMP DEFAULT NOW()
);

-- 3. 사용자 쿼리 히스토리 (학습용)
CREATE TABLE query_history (
    id SERIAL PRIMARY KEY,
    user_id INTEGER,
    query TEXT,
    query_embedding vector(1536),
    response TEXT,
    feedback INTEGER,  -- 1: 좋음, -1: 나쁨
    domain VARCHAR(50),  -- 'consumer', 'admin', 'manufacturer', 'partner'
    created_at TIMESTAMP DEFAULT NOW()
);

-- 인덱스 생성 (HNSW가 IVFFlat보다 빠름)
CREATE INDEX product_emb_idx ON product_embeddings 
USING hnsw (embedding vector_cosine_ops);

CREATE INDEX doc_emb_idx ON document_embeddings 
USING hnsw (embedding vector_cosine_ops);

CREATE INDEX query_emb_idx ON query_history 
USING hnsw (query_embedding vector_cosine_ops);

Step 3: 기존 데이터 벡터화 마이그레이션

기존 데이터를 벡터로 변환하는 마이그레이션 스크립트입니다:

# migration_script.py
import asyncio
import asyncpg
from openai import AsyncOpenAI
from typing import List, Dict

class VectorMigration:
    def __init__(self, database_url: str, openai_api_key: str):
        self.database_url = database_url
        self.client = AsyncOpenAI(api_key=openai_api_key)
        
    async def migrate_products(self):
        """기존 products 테이블 데이터를 벡터화"""
        conn = await asyncpg.connect(self.database_url)
        
        try:
            # 1. 기존 제품 데이터 조회
            products = await conn.fetch("""
                SELECT id, name, description, category
                FROM products
                WHERE id NOT IN (SELECT product_id FROM product_embeddings)
            """)
            
            print(f"벡터화할 제품 수: {len(products)}")
            
            # 2. 배치로 처리 (OpenAI API 제한 고려)
            batch_size = 100
            for i in range(0, len(products), batch_size):
                batch = products[i:i+batch_size]
                await self._process_product_batch(conn, batch)
                print(f"진행률: {min(i+batch_size, len(products))}/{len(products)}")
                
        finally:
            await conn.close()
    
    async def _process_product_batch(
        self, 
        conn: asyncpg.Connection, 
        products: List[asyncpg.Record]
    ):
        """제품 배치를 벡터화하여 저장"""
        # 1. 텍스트 준비
        texts = [
            f"{p['name']}\n{p['description']}\n카테고리: {p['category']}"
            for p in products
        ]
        
        # 2. 임베딩 생성 (배치)
        response = await self.client.embeddings.create(
            model="text-embedding-3-small",
            input=texts
        )
        
        # 3. DB에 저장
        for product, embedding_obj in zip(products, response.data):
            await conn.execute("""
                INSERT INTO product_embeddings (product_id, embedding)
                VALUES ($1, $2)
                ON CONFLICT (product_id) DO UPDATE
                SET embedding = $2, updated_at = NOW()
            """, product['id'], embedding_obj.embedding)

# 실행
async def main():
    migration = VectorMigration(
        database_url="postgresql://...",
        openai_api_key="sk-..."
    )
    await migration.migrate_products()

if __name__ == "__main__":
    asyncio.run(main())

Step 4: 통합 Repository 패턴

벡터 검색과 비즈니스 로직을 통합한 Repository 패턴 구현:

# app/core/repositories/product_repository.py
from typing import List, Optional, Dict, Any
import asyncpg

class ProductRepository:
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool
    
    async def search_with_filters(
        self,
        query_embedding: List[float],
        filters: Dict[str, Any],
        limit: int = 10
    ) -> List[Dict[str, Any]]:
        """벡터 검색 + 비즈니스 필터 통합"""
        
        # 동적 WHERE 절 생성
        where_clauses = []
        params = [query_embedding]
        param_idx = 2
        
        if filters.get('category'):
            where_clauses.append(f"p.category = ${param_idx}")
            params.append(filters['category'])
            param_idx += 1
        
        if filters.get('min_price'):
            where_clauses.append(f"p.price >= ${param_idx}")
            params.append(filters['min_price'])
            param_idx += 1
        
        if filters.get('max_price'):
            where_clauses.append(f"p.price <= ${param_idx}")
            params.append(filters['max_price'])
            param_idx += 1
        
        where_sql = "AND " + " AND ".join(where_clauses) if where_clauses else ""
        
        # 통합 쿼리
        query = f"""
            SELECT 
                p.id,
                p.name,
                p.description,
                p.price,
                p.stock,
                p.category,
                1 - (pe.embedding <=> $1) as similarity
            FROM products p
            JOIN product_embeddings pe ON p.id = pe.product_id
            WHERE 1=1 {where_sql}
            ORDER BY pe.embedding <=> $1
            LIMIT {limit}
        """
        
        async with self.pool.acquire() as conn:
            results = await conn.fetch(query, *params)
        
        return [dict(r) for r in results]
    
    async def get_with_similar(
        self, 
        product_id: int, 
        limit: int = 5
    ) -> Dict[str, Any]:
        """제품 상세 + 유사 제품"""
        async with self.pool.acquire() as conn:
            # 1. 제품 상세
            product = await conn.fetchrow("""
                SELECT p.*, pe.embedding
                FROM products p
                LEFT JOIN product_embeddings pe ON p.id = pe.product_id
                WHERE p.id = $1
            """, product_id)
            
            if not product or not product['embedding']:
                return None
            
            # 2. 유사 제품
            similar = await conn.fetch("""
                SELECT 
                    p.id,
                    p.name,
                    p.price,
                    1 - (pe.embedding <=> $1) as similarity
                FROM products p
                JOIN product_embeddings pe ON p.id = pe.product_id
                WHERE p.id != $2
                ORDER BY pe.embedding <=> $1
                LIMIT $3
            """, product['embedding'], product_id, limit)
            
            return {
                'product': dict(product),
                'similar_products': [dict(s) for s in similar]
            }

Step 5: 도메인별 벡터 검색 서비스

사용자별 맞춤형 RAG 서비스 구현:

# app/domain/consumer/services/rag_service.py
class ConsumerRAGService:
    def __init__(
        self, 
        pool: asyncpg.Pool,
        openai_client: AsyncOpenAI
    ):
        self.pool = pool
        self.client = openai_client
        self.product_repo = ProductRepository(pool)
    
    async def semantic_product_search(
        self,
        query: str,
        category: Optional[str] = None,
        price_range: Optional[tuple] = None
    ) -> Dict[str, Any]:
        """소비자용 의미론적 제품 검색"""
        
        # 1. 쿼리 벡터화
        query_emb = await self._embed(query)
        
        # 2. 필터 구성
        filters = {}
        if category:
            filters['category'] = category
        if price_range:
            filters['min_price'], filters['max_price'] = price_range
        
        # 3. 통합 검색
        products = await self.product_repo.search_with_filters(
            query_embedding=query_emb,
            filters=filters,
            limit=10
        )
        
        # 4. LLM으로 결과 요약
        summary = await self._summarize_results(query, products)
        
        return {
            'query': query,
            'products': products,
            'summary': summary,
            'count': len(products)
        }
    
    async def _embed(self, text: str) -> List[float]:
        response = await self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding

Step 6: 트리거로 자동 벡터화

데이터 변경 시 자동으로 벡터화를 수행하는 트리거 시스템:

-- 제품이 추가/수정될 때 자동으로 벡터화 대기열에 추가
CREATE TABLE embedding_queue (
    id SERIAL PRIMARY KEY,
    table_name VARCHAR(50),
    record_id INTEGER,
    status VARCHAR(20) DEFAULT 'pending',  -- pending, processing, completed, failed
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE OR REPLACE FUNCTION queue_embedding_update()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO embedding_queue (table_name, record_id)
    VALUES (TG_TABLE_NAME, NEW.id)
    ON CONFLICT DO NOTHING;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- 트리거 생성
CREATE TRIGGER product_embedding_trigger
AFTER INSERT OR UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION queue_embedding_update();

Step 7: 백그라운드 워커

벡터화 대기열을 처리하는 백그라운드 워커:

# worker.py - 대기열 처리
import asyncio

class EmbeddingWorker:
    async def process_queue(self):
        """주기적으로 대기열 처리"""
        while True:
            async with self.pool.acquire() as conn:
                # 1. 대기 중인 작업 가져오기
                jobs = await conn.fetch("""
                    UPDATE embedding_queue
                    SET status = 'processing'
                    WHERE id IN (
                        SELECT id FROM embedding_queue
                        WHERE status = 'pending'
                        LIMIT 10
                    )
                    RETURNING *
                """)
                
                # 2. 처리
                for job in jobs:
                    await self._process_job(conn, job)
            
            await asyncio.sleep(5)  # 5초마다 확인

권장 아키텍처

┌─────────────────────────────────────────────────┐
│         Neon PostgreSQL (기존 DB)               │
├─────────────────────────────────────────────────┤
│  ┌──────────────┐  ┌────────────────────────┐  │
│  │ 기존 테이블  │  │   pgvector 테이블      │  │
│  │             │  │                        │  │
│  │ products    │──│ product_embeddings     │  │
│  │ users       │  │ document_embeddings    │  │
│  │ orders      │  │ query_history          │  │
│  └──────────────┘  └────────────────────────┘  │
│                           │                     │
│                    HNSW Index                   │
└─────────────────────────────────────────────────┘
                    │
            ┌───────┴────────┐
            │                │
    ┌───────▼──────┐  ┌──────▼────────┐
    │ FastAPI      │  │ LangGraph     │
    │ Endpoints    │  │ Pipeline      │
    └──────────────┘  └───────────────┘

결론

기존 Neon DB에 pgvector extension을 추가하는 것이 데이터 일관성, 관리 편의성, 비용 측면에서 모두 유리합니다.

핵심 장점 요약

언제 별도 DB를 고려할까?

별도 벡터 DB는 다음과 같은 경우에만 고려하세요:

대부분의 프로젝트에서는 Neon + pgvector 조합이 최적의 선택입니다!


이 글이 도움이 되셨다면 댓글로 피드백을 남겨주세요. 더 자세한 구현 예제나 특정 부분에 대한 질문도 환영합니다!