Привет Хабр! Меня зовут Владимир и я алкоголик решил выйти из зоны комфорта и начать чего-нибудь писать. В этом материале я хочу поделиться своими наработками по организации векторного хранилища в PostgreSQL и реализации базового API для него.
Дисклеймер
Данная статья не является ультимативным гайдом по вышеназванным инструментам. Это скорее фиксация моего личного опыта и попытка собрать работающий прототип “с нуля”.
А почему не…
Сразу к вопросу “А почему не Qdrant, Elasticsearch, ChromaDB, {ваш вариант}?” На самом деле PostgreSQL был выбран, т.к. рабочая база потихоньку переезжает на него. Никаких особых сравнений не проводил, скорее это было “Ух ты, у PostgreSQL есть вектор! Срочно пробовать!”
План
Собственно, для реализации API нам сегодня понадобится:
1. Естественно заиметь где-нибуть PostgreSQL
2. Создать таблицу.
3. Реализовать подключение к БД
4. Создать репозиторий (не тот, который Git, а тот, который паттерн)
5. Сделать API
6. Упаковать в контейнер
Ну, как говорится “Ну что, народ, погнали, на!”© (надеюсь кто-то выкупит отсылку…)
PostgreSQL и PGVector
Для начала необходимо где-то поднять базу данных. Можно установить локально, развернуть в облаке, на локальном сервере. Лично мне нравится работать с Докером – локально установленный PostgreSQL (по крайней мере у меня на винде) любит самозапускаться и занимать свой дефолтный порт. Да и нравится мне разные базы под разные проекты делать, чтобы не поломать ничего. Поэтому берем с Docker Hub последнюю версию образа PGVector и вперед запускать! В docker-compose.yml нужно прокинуть наружу порт для доступа к БД и провести её начальную инициализацию – собственно установить расширение PGVector. Для этого надо создать файл init.sql и прокинуть его в контейнер:
CREATE EXTENSION IF NOT EXISTS vector;
Я положил его в папку db_init. Ну и собственно сам docker-compose.yml:
services:
postgres:
image: pgvector/pgvector:0.8.1-pg18-trixie
environment:
POSTGRES_DB: ${DB__NAME:-vectordb}
POSTGRES_USER: ${DB__USER:-user}
POSTGRES_PASSWORD: ${DB__PASSWORD:-password}
volumes:
- pgdata:/var/lib/postgresql
- ./db_init:/docker-entrypoint-initdb.d
ports:
- "${DB__PORT:-5432}:${DB__PORT:-5432}"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${DB__USER:-user} -d ${DB__NAME:-vectordb}"]
interval: 5s
timeout: 5s
retries: 5
volumes:
pgdata:
В корне проекта создадим .env файл. Кроме параметров для БД в нём сразу укажем ожидаемую размерность эмбеддинга текста и параметры запуска сервиса (на будущее):
DB__NAME=vectordb
DB__USER=user
DB__PASSWORD=password
DB__HOST=localhost
DB__PORT=5432
DB__EMBEDDING_DIM=1024
SERVICE__HOST=0.0.0.0
SERVICE__PORT=8000
Поднимаем сервис (docker compose up -d если вдруг кто забыл) и немного ждём, пока скачается и развернется образ с хаба.
Когда консоль вернется в своё исходное состояние, желательно проверить, что база данных готова к работе (случай из жизни – в докер не подтянулись переменные из .env и база создалась с некорректными логин/паролем). Для проверки буду использовать программу DBeaver – она бесплатная и для целей проверки БД достаточно удобная. Качаем, ставим, запускаем, вводим данные из .env, и если всё правильно, то видим успешное окошко успеха. Заодно там-же можно посмотреть, что расширение PGVector так-же установилось.
Поздравляю! У нас есть база данных. Осталось её заполнить.
Создание таблицы
Для создания таблиц (в моём случае 1 шт.) можно использовать голый SQL. Но это не мой метод, поэтому будем реализовывать таблицу через SQLAlchemy alembic.
Создадим виртуальное окружение. У меня менеджер uv, но можно воспользоваться и ванильным pip
uv init
uv add alembic==1.18.4 asyncpg==0.31.0 pgvector==0.4.2 pydantic-settings==2.12.0 sqlalchemy==2.0.46
Если коротко, то через sqlalchemy будем работать с БД, pgvector нужна, чтобы как раз добавить в схемы Алхимии тип данных Vector, asyncpg – асинхронный движок для той-же Алхимии, alembicом будем делать миграции, ну а pydantic-settings – чисто для красоты, чтобы dotenv не использовать. Для корректности надо бы ещё было указать обычный pydantic, но т.к. он идёт в комплекте с pydantic-settings, то сэкономим несколько букв.
Теперь создадим sqlalchemy модели, которые потом будем мигрировать. Модели буду размещать в каталоге db_models. Заодно сразу сделаем его пакетом с помощью создания __init__.py
Сначала реализуем базовую модель, от которой будут наследоваться другие. Создадим файл db_modelsbase.py:
class Base(AsyncAttrs, DeclarativeBase):
__abstract__ = True
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
created_at: Mapped[datetime] = mapped_column(
server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(), onupdate=func.now())
@declared_attr.directive
def __tablename__(cls) -> str:
return cls.__name__.lower() 's'
Класс наследуется от двух других – базового класса самой Алхимии DeclarativeBase и класса AsyncAttrs, который добавит нашему базовому классу поддержку асинхронности.
Далее метим класс абстрактным (чтобы не создавал таблицу), добавляем стандартные поля – id и временные метки создания и изменения, которые будут проставляться автоматически.
Ну и немного трансмутации от Алхимии:
@declared_attr.directive
def __tablename__(cls) -> str:
return cls.__name__.lower() 's'
Данный код обеспечивает автоматическое превращение имени класса в имя таблицы.
Теперь реализуем класс для хранения векторизованных (векторизированных?) документов db_modelsdocuments.py:
from pgvector.sqlalchemy import Vector
from sqlalchemy.dialects.postgresql import JSONB
class Document(Base):
content: Mapped[str]
embedding: Mapped[np.ndarray] = mapped_column(Vector(settings.db.embedding_dim))
meta_data: Mapped[dict] = mapped_column(JSONB, nullable=False, default={})
Класс сделан максимально просто – поля для “сырого” текста, его эмбеддинга и поле для метаданных. Из интересного в нём, только то, что для Алхимии класс Vector должен быть импортирован именно из pgvector.sqlalchemy. Ну и мапится он в numpy массив, т.к. Vector заточен под работу именно с numpy. Метаданные хранятся в базе данных в виде JSONB, который, как я понял лучше оптимизирован под поиск и индексацию.
Сделаем ещё один подготовительный шаг – создадим настройки для будущего приложения. В папке core создадим файл settings.py. В нем создаем две Pydantic модели – одну для хранения настроек базы данных, другую – для нашего сервиса. В настройки базы данных дополнительно прописываем свойство, которое возвращает готовый url для доступа к базе данных
from pathlib import Path
from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict
class ServiceConfig(BaseModel):
host: str
port: int
class PGVectorConfig(BaseModel):
name: str
user: str
password: str
host: str
port: int
embedding_dim: int
@property
def db_url(self) -> str:
return (
f'postgresql asyncpg://{self.user}:{self.password}@'
f'{self.host}:{self.port}/{self.name}')
Теперь добавим класс, загружающий настройки из .env файла в проект
class Settings(BaseSettings):
db: PGVectorConfig
service: ServiceConfig
model_config = SettingsConfigDict(
env_file=Path(__file__).resolve().parent.parent / '.env',
env_file_encoding='utf-8',
extra='ignore',
case_sensitive=False,
env_nested_delimiter='__',
)
settings = Settings()
Основная магия настроек кроется в параметре env_nested_delimiter='__'. Он позволяет автоматически парсить .env файл, при условии, что его структура выполнена по правилу:
{Имя поля Settings}__{Имя поля модели}={Значение}
Например, запись DB__USER=user будет доступна через settings.db.user
Настройки готовы. Осталось добавить импорт необходимых классов в __init__.py у пакетов для удобства использования
# core__init__.py
from .settings import settings
# db_models__init__.py
from .base import Base
from .documents import Document
__all__ = ['Base', 'Document'] # Для упрощения импорта в Alembic
Ну что, подготовительные работы закончены, время миграции!

Для начала создадим папку и настройки миграции командой
alembic init -t async migrations
Ключ -t async указывает, что нужна именно асинхронная версия. Имя папки (migrations) можно задать на свой вкус. Если всё прошло успешно, то в корне проекта увидим папку migrations и файл alembic.ini.
Далее надо указать alembic связи к базе данных и Алхимии. В файл migrationsenv.py необходимо добавить:
-
импорт наших моделей (
import db_models). Важно импортировать все модели, которые хотим мигрировать -
путь к нашей БД (
config.set_main_option("sqlalchemy.url", settings.db.db_url)) -
обязательно прокинуть в
alembicинформацию о наших таблицах для автоматической генерации кода миграции.
Учитывая, что все (одна) таблицы наследуются от базовой, то вся нужная Алембику информация хранится в Base.metadata. Собственно, прописываем target_metadata = Base.metadata и едем дальше. Правда надо учесть, что это сработает только если модели были импортированы (см. второе перечисление выше).
Следующая команда генерирует миграции:
alembic revision --autogenerate -m "init"
Под ключем -m пишем информационное сообщение на память. В результате выполнения команды получаем код миграции. Он расположен в папке versions и имеет название из набор_буквоцифр_init.py.
Следующим шагом необходимо доработать файл с кодом миграций. Открываем, добавляем в начало import pgvector и… Всё. Доработка закончена. Это необходимо делать для каждой миграции, в которой есть тип данных Vector. Применяем миграции:
alembic upgrade head
Готово. На всякий случай убедимся, с помощью Бобра, что таблица успешно создалась.
Подключение к базе данных
Создадим файл db_modelsdatabase.py (согласен, название и расположние не логичные, но ничего не придумал лучше)
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from core import settings
engine = create_async_engine(url=settings.db.db_url)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
Этими двумя (не считая импортов) строчками мы создали подключение к БД и фабрику сессий. Также, для простоты работы создадим кастомный контекстный менеджер
@asynccontextmanager
async def transaction(
isolation_level: str | None = None,
) -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
if isolation_level:
await session.connection(
execution_options={'isolation_level': isolation_level}
)
async with session.begin():
yield session
Данный контекстный менеджер гарантирует, что сессия всегда будет закрыта, а транзакция — завершена (commit при успехе или rollback при ошибках). Бонусом, контекстный менеджер позволяет задать уровень изоляции. Добавляем наш менеджер в __init__.py и переходим к созданию репозитория
Создание репозитория
Паттерн Репозиторий — это структурный паттерн проектирования, который создаёт слой абстракции между бизнес-логикой приложения и источником данных (базой данных, API, файлами).
Минутка теории закончена. Переходим к созданию. Репозиторий так-же будет из двух частей – базовой, реализующей CRUD методы, и специализированной под нашу таблицу, реализующую поиск.
Начнем с базового класса. Создадим пакет repositories (каталог и init), в нем файл base.py
from typing import Generic, TypeVar
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from db_models import Base
ModelType = TypeVar('ModelType', bound=Base)
CreateSchemaType = TypeVar('CreateSchemaType', bound=BaseModel)
UpdateSchemaType = TypeVar('UpdateSchemaType', bound=BaseModel)
class BaseRepository(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, session: AsyncSession, model: type[ModelType]):
self.session = session
self.model = model
Так как в перспективе класс должен работать с несколькими таблицами, воспользуемся дженериками:
-
ModelType– тип для ORM модели, унаследованной отdb_models.Base -
CreateSchemaType/UpdateSchemaType– типы для схем валидации данных от API, унаследованные отpydantic.BaseModel
Перед реализацией стандартных CRUD методов необходимо решить один вопрос – API передаёт векторы как list[float] (JSON формат), тогда как наша база данных ожидает np.ndarray. Напишем простенький конвертор данных, который при наличии поля embedding в словаре данных преобразует его к необходимому формату.
@staticmethod
def _normalize_embedding_format(data: dict) -> dict:
if 'embedding' in data and data['embedding']:
data['embedding'] = np.array(data['embedding'], dtype=np.float32)
return data
Далее реализуем CRUD методы:
CRUD репозиторий
Метод Create
async def create(self, schema: CreateSchemaType) -> ModelType:
instance = self.model(**self._normalize_embedding_format(schema.model_dump()))
self.session.add(instance)
await self.session.flush()
return instance
Метод принимает на вход Pydantic модель, преобразует её в словарь (с преобразованием типа поля embedding), добавляет в сессию. Вызов flush() нужен для получения значений самогенерируемых полей (id и время)
Методы Read
Для чтения из базы реализуем два метода – получение конкретного документа по полю id и списка документов (с пагинацией)
async def get(self, doc_id: UUID) -> ModelType | None:
return await self.session.get(self.model, doc_id)
async def get_all(self, skip: int = 0, limit: int = 100) -> list[ModelType]:
query = select(self.model).offset(skip).limit(limit)
result = await self.session.execute(query)
return list(result.scalars().all())
Метод get использует стандартный метод SQLAlchemy, реализующий полусение данных по первичному ключу. Для получения списка документов сначала формируем SELECT запрос, добавляя в него начальное смещение OFFSET и ограничение LIMIT. Далее отправляем запрос к БД и преобразуем результаты в список.
Метод Update
async def update(self, doc_id: UUID, schema: UpdateSchemaType) -> ModelType | None:
update_data = schema.model_dump(exclude_unset=True)
if not update_data:
return await self.get(doc_id)
query = (
update(self.model)
.where(self.model.id == doc_id)
.values(**self._normalize_embedding_format(update_data))
.returning(self.model)
)
result = await self.session.execute(query)
return result.scalar_one_or_none()
Из интересного в этом методе – при преобразовании Pydantic модели в словарь надо обязательно указать параметр exclude_unset=True, чтобы отсутствующие поля не заменялись на None и не стирали данные.
Метод Delete
async def delete(self, doc_id: UUID) -> bool:
query = delete(self.model).where(self.model.id == doc_id)
result = await self.session.execute(query)
return getattr(result, 'rowcount', 0) > 0
Метод пытается удалить документ по id. Возвращает True, если удаление успешно (result['rowcount'] содержит число удалённых строк)
Бонус метод
Дополнительно решил добавить метод проверки существования документа
async def exists(self, doc_id: UUID) -> bool:
query = select(self.model).where(self.model.id == doc_id).exists()
result = await self.session.execute(select(query))
return result.scalar()
Базовые методы готовы.
Pydantic модели для валидации данных
В базовом классе репозитория мы прописали две типомодели – для создания и изменения документа. Опишем их в файле schemasdocument.py:
from typing import Annotated, Any
from pydantic import BaseModel, Field
from core import settings
class DocumentCreate(BaseModel):
content: Annotated[str, Field(..., description='Содержимое документа')]
meta_data: Annotated[
dict[str, Any], Field(default_factory=dict, description='Метаданные документа')]
embedding: Annotated[
list[float],
Field(..., description=f'Вектор документа размерности {settings.db.embedding_dim}')]
class DocumentUpdate(BaseModel):
content: Annotated[str | None, Field(None, description='Содержимое документа')]
meta_data: Annotated[
dict[str, Any] | None, Field(None, description='Метаданные документа')]
embedding: Annotated[
list[float] | None,
Field(None, description=f'Вектор документа размерности {settings.db.embedding_dim}')]
В моделях описываем только изменяемые поля. Формально, модели отличаются только тем, что у модели для создания документа все параметры обязательные а у Update модели – опциональные. Учитывая, что документы могут создаваться без метаданных, у поля meta_data модели создания документа задано значение по умолчанию (для безусловного создания модели, соответствующей схеме БД).
Минутка занимательной теории:
В Pydantic
Field(default=...)используется для неизменяемых типов (числа, строки), но для изменяемых (списки, словари) поле инициализируется на уровне класса и будет общим для всех экземпляров модели. В свою очередьField(default_factory=...)принимает функцию или вызываемый объект, который выполняется при каждом создании модели.
Document репозиторий
Создадим файл repositoriesdocument.py
from sqlalchemy.ext.asyncio import AsyncSession
from db_models import Document
from schemas import DocumentCreate, DocumentUpdate
from .base import BaseRepository
class DocumentRepository(BaseRepository[Document, DocumentCreate, DocumentUpdate]):
def __init__(self, session: AsyncSession):
super().__init__(session, Document)
Пока всё просто – наследуемся от базового класса, передавая при этом ему наши модели Алхимии и валидации. Теперь время определиться с методами. Так как это векторная база, то однозначно нужен семантический поиск. Далее можно добавить извлечение по метаданным (например, для получения всех чанков одного документа) и удаление по метаданным (тоже, например, грохнуть все чанки документа). Ну и умные люди посоветовали метод подсчета добавить, также по метаданным.
Далее сами методы. Фактически, они все однотипные – к базовому запросу (SELECT или DELETE) добавляем по-очереди WHERE со всеми парами ключ-значение из фильтра, добавляем LIMIT (если есть) и делаем запрос к базе. Эту часть можно вынести в общий метод.
async def _execute_filtered_query(
self,
query: GenerativeSelect | Delete,
meta_filter: dict | None = None,
limit: int | None = None
):
if meta_filter:
for key, value in meta_filter.items():
query = query.where(Document.meta_data[key].astext == str(value))
if limit:
query = query.limit(limit)
result = await self.session.execute(query)
return result
Из самих методов интересен только семантический поиск. В нем в качестве параметра сортировки передаётся функция cosine_distance, в которую передаём целевой вектор, преобразованный в np.array
async def semantic_search(
self,
embedding: list[float],
meta_filter: dict | None = None,
limit: int = 10
) -> list[Document]:
query = select(Document).order_by(
Document.embedding.cosine_distance(np.array(embedding, dtype=np.float32)))
result = await self._execute_filtered_query(query=query, meta_filter=meta_filter, limit=limit)
return list(result.scalars().all())
Остальные методы до безобразия одинаковы
async def get_by_metadata(self, meta_filter: dict, limit: int = 100) -> list[Document]:
query = select(Document)
result = await self._execute_filtered_query(query=query, meta_filter=meta_filter, limit=limit)
return list(result.scalars().all())
async def delete_by_metadata(self, meta_filter: dict) -> int:
query = delete(Document)
result = await self._execute_filtered_query(query=query, meta_filter=meta_filter)
return getattr(result, 'rowcount', 0)
async def count_by_metadata(self, meta_filter: dict) -> int:
query = select(func.count()).select_from(Document)
result = await self._execute_filtered_query(query=query, meta_filter=meta_filter)
return result.scalar_one()
delete_by_metadata, в отличие от базового удаления по id, возвращает количество удаленных записей
Ну что, поздравляю нас, база готова!! Осталось проверить работает ли всё это, как задумано. Для этого напишем пару тестов
Тестирование репозитория
Начнем с самого простого – создания документа. Напишем небольшой тест прямо в main.py файле:
import asyncio
from db_models import transaction
from repositories import DocumentRepository
from core import settings
from schemas import DocumentCreate
async def main():
async with transaction() as session:
repo = DocumentRepository(session)
print('Создание документа. ', end='')
doc_data = DocumentCreate(
content='Тестовый документ',
embedding=[0.1, 0.2, 0.3, 0.4] * (settings.db.embedding_dim // 4),
meta_data={'author': 'tester', 'type': 'test'}
)
doc = await repo.create(doc_data)
print(f'Создан документ: ID {doc.id}; Текст "{doc.content}"')
if __name__ == '__main__':
asyncio.run(main())
Кратко о коде – получаем сессию, создаем объект репозитория и передаём в методе create репозитория Pydantic модель с какими-то данными.
Запускаем наш чудо-тест:
python main.py
На всякий случай проверим Бобром:
Ура! Теперь тест посложнее. Проверим, сможем ли мы удалить только что созданный документ
async def main():
async with transaction() as session:
repo = DocumentRepository(session)
doc_id = UUID('941f9ae4-6a25-40fb-a1b4-fd8db245c53e')
print('Проверка существования. ', end='')
exists = await repo.exists(doc_id)
print(f'Результат проверки: {exists}')
print('Удаление документа. ', end='')
deleted = await repo.delete(doc_id)
print(f'Результат удаления: {deleted}')
print('Проверка существования после удаления. ', end='')
exists = await repo.exists(doc_id)
print(f'Результат проверки: {exists}')
В переменную doc_id скопируем ID только что созданного документа. Далее делаем следующее: проверяем, есть ли вообще данный документ, затем пробуем его удалить и вновь проверяем существование. Запускаем
Ну и напоследок проверим работу самого важного метода для векторного хранилища – семантического поиска. Для этого создадим в безе данных три фиктивных документа с разными эмбеддингами и попробуем найти один из них, немного изменив целевой эмбеддинг
async def main():
async with transaction() as session:
repo = DocumentRepository(session)
for i in range(3):
doc = await repo.create(DocumentCreate(
content=f'Поисковый тест номер {i}',
embedding=[0.1 * i, 0.2, 0.3, 0.4] * (settings.db.embedding_dim // 4),
meta_data={'tag': f'test-{i}', 'group': 'A'}
))
print(
f'Создан документ: ID {doc.id}; '
f'Текст "{doc.content}"; '
f'Тег "{doc.meta_data["tag"]}"'
)
print('Векторный поиск (похожий на test-2). ', end='')
query_vector = [0.19, 0.2, 0.3, 0.4] * (settings.db.embedding_dim // 4)
similar = await repo.semantic_search(query_vector, limit=2)
print(
f'Найдено {len(similar)} шт c тегами',
','.join(f'"{doc.meta_data["tag"]}"' for doc in similar)
)
print('Удаление по метаданным. ', end='')
result = await repo.delete_by_metadata({'group': 'A'})
print(f'Удалено {result} записей.')
В конце теста чистим созданные документы, заодно проверяя как работает удаление по метаданным. Сохраняем, запускаем, смотрим:
Ну что, базовые механики репозитория проверены, осталось реализовать API….
Реализация API
API, как понятно из заголовка, будет на FastAPI – лучшем асинхронном фреймворке всех времён и народов (но это не точно). Добавим его в виртуальное окружение
uv add fastapi==0.128.7 uvicorn==0.40.0
Далее нам необходимо дополнить наши Pydantic схемы. Пока что у нас есть схемы создания и обновления записи. Теперь пришло время добавить схемы для проверки входных и выходных данных API. Для входных данных дополнительно необходимы схемы для поиска и фильтру по метаданным, а для выходных – полная схема документа. Собственно, вот они:
class DocumentResponse(BaseModel):
id: Annotated[UUID, Field(..., description='Идентификатор документа')]
content: Annotated[str, Field(..., description='Содержимое документа')]
meta_data: Annotated[
dict[str, Any],
Field(default_factory=dict, description='Метаданные документа')]
embedding: Annotated[
list[float],
Field(..., description=f'Вектор документа размерности {settings.db.embedding_dim}')]
created_at: Annotated[
datetime, Field(..., description='Дата и время создания документа')]
updated_at: Annotated[
datetime,
Field(..., description='Дата и время последнего обновления документа')]
model_config = ConfigDict(from_attributes=True)
class SemanticSearchRequest(BaseModel):
embedding: Annotated[
list[float],
Field(..., description=f'Вектор документа размерности {settings.db.embedding_dim}')]
meta_filter: Annotated[
dict | None, Field(default=None, description='Фильтр по метаданным')]
limit: Annotated[
int, Field(default=10, ge=1, le=100, description='Количество результатов')]
class MetadataFilterRequest(BaseModel):
meta_filter: Annotated[dict, Field(..., description='Фильтр по метаданным')]
limit: Annotated[
int, Field(default=100, ge=1, le=1000, description='Количество результатов')]
Из интересного – модель DocumentResponse используется для преобразования результата, полученного от базы данных. Для обеспечения возможности работы Pydantic с экземплярами данных SQLAlchemy, модели необходимо задать параметр from_attributes=True. Это критически важно для автоматической сериализации ответов.
Переходим к API. При написании API будем придерживаться требования о версионировании, а конкретно зададим эндпоинту префикс api/v1/.... Для этого создадим каталоги api и v1 и в каждом создадим файл __init__.py. В них пропишем:
# api__init__.py
from fastapi import APIRouter
from .v1 import router as v1_router
router = APIRouter()
router.include_router(v1_router, prefix='/v1')
# apiv1__init__.py
from fastapi import APIRouter
from .crud import router as documents_crud_router
from .search import router as documents_search_router
router = APIRouter()
router.include_router(documents_crud_router, prefix='/documents')
router.include_router(documents_search_router, prefix='/documents/search')
Зачем это надо – если вдруг решим сделать прям вообще другой интерфейс у эндпоинта, то создаём папку v2, в api__init__.py добавляем router.include_router(v2_router, prefix='/v2') и вуаля – у нас готово версионирование API: новая версия работает параллельно со старой, не ломая обратную совместимость.
APIшку CRUD разберу на базе GET запроса документов (всех и одного), так как там больше всего кода
@router.get(path='/', response_model=list[DocumentResponse])
async def list_documents(params: Annotated[PaginationParams, Query()]):
async with transaction() as session:
repo = DocumentRepository(session)
docs = await repo.get_all(**params.model_dump())
return docs
@router.get(path='/{doc_id}', response_model=DocumentResponse)
async def get_document(doc_id: UUID):
async with transaction() as session:
repo = DocumentRepository(session)
doc = await repo.get(doc_id=doc_id)
if not doc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail='Document not found')
return doc
Эти два метода (да и остальные тоже) очень похожи – извлекаем из фабрики сессию для общения с БД, создаём объект нашего репозитория, получаем данные и отправляем пользователю. Разнообразия добавляет наличие пагинации в методе list_documents и возврат 404 в методе list_documents. Остальные методы реализовываются аналогично.
Теперь API для поиска документов. Эндпоинтов в нём будет четыре – семантический поиск, поиск/удаление по метаданным и поиск схожих документов.
Первые три просто вызывают соответствующие методы репозитория. Вот, например, метод для семантического поиска:
@router.post(
path='/semantic',
response_model=list[DocumentResponse],
summary='Семантический поиск по векторному представлению'
)
async def semantic_search(request: SemanticSearchRequest):
async with transaction() as session:
repo = DocumentRepository(session)
results = await repo.semantic_search(
embedding=request.embedding,
meta_filter=request.meta_filter,
limit=request.limit
)
return results
Эндпоинт для поиска похожих документов самую малость посложнее:
@router.get(
path='/similar/{document_id}',
response_model=list[DocumentResponse],
summary='Поиск похожих документов',
)
async def find_similar(
document_id: UUID,
limit: int = Query(default=10, ge=1, le=100, description='Количество результатов')
):
async with transaction() as session:
repo = DocumentRepository(session)
source = await repo.get(document_id)
if not source:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Document not found'
)
results = await repo.semantic_search(
source.embedding.tolist(),
limit=(limit 1)
)
return [doc for doc in results if doc.id != document_id]
В данном эндпоинте сначала проверяем существует ли документ. Если сужествует, то вызываем семантический поиск, где в качестве эмбеддинга передаём эмбеддинг найденого документа. Далее возвращаем все найденные документы, кроме ключевого.
Остался последний штрих – собственно создать наше FastAPI приложение. Создадим в корне проекта файл main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from api import router as api_router
from db_models.database import engine
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
await engine.dispose()
app = FastAPI(lifespan=lifespan)
app.include_router(api_router, prefix='/api')
@app.get('/health')
def health():
return {'status': 'ok'}
Для начала, не считая импортов, реализуем менеджер контекста жизненного цикла для FastAPI приложения, позволяющий проводить некоторые действия при запуске и завершении работы приложения. В данном случае при запуске (до оператора yield) ничего не делаем, а при завершении выполняем engine.dispose() для обеспечения корректного закрытия соединения с базой данных.
Далее создаём само приложение FastAPI, прокидываем в него lifespan и вешаем на него ранее созданные эндпоинты.
Ну и вишенкой добавим к нашему приложению бесполезный (пока что!) эндпоинт /health. В дальнейшем на него можно повесить что-нибудь, позволяющее пингануть живой ли наш сервис. Но это как-нибудь потом.
А пока что, будем считать API более-менее готовым. Проверим, что эндпоинты есть и их параметры соответствуют задуманным. Для этого запустим сервер с нашим API и узрим магию FastAPI – автоматическое документирование.
uvicorn main:app
Далее переходим по ссылке 127.0.0.1:8000/docs и наблюдаем замечательную страницу с документацией.
Теоретически, можно уже из этой страницы потестить наш API, повыдовав в него разные запросы, благо FastAPI это всё замечательно реализовал за нас. Но я всё-таки решил добавить сюда блок тестов API.
Тестирование API
На самом деле основные проверки мы уже реализовали в части про тестирование репозитория. Их надо немного дополнить неохваченными методами репозитория а потом повторить как тесты API. Описывать все тесты не буду (в конце дам ссылку на Гит, там всё будет), они очень однообразные. Остановлюсь на основных моментах с которыми были проблемы.
Для начала, естественно, надо создать пакет для тестов и прописать конфиг для pytest. Расположение конфига зависит от наличия в проекте файла pyproject.toml. В конфиге укажем, что тесты асинхронные и дадим ссылку на наш тест-пакет:
# pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
# Или pytest.ini, если pyproject.toml нет
asyncio_mode = auto
testpaths = tests
Дальше необходимо создать файл testsconftest.py для фикстур наших тестов :
@pytest_asyncio.fixture(scope='session')
def test_engine():
engine = create_async_engine(settings.db.db_url, poolclass=NullPool, echo=False,)
yield engine
asyncio.run(engine.dispose())
@pytest_asyncio.fixture(scope='function')
async def test_session(test_engine):
async_session = async_sessionmaker(test_engine, expire_on_commit=False)
async with async_session() as session:
yield session
@pytest_asyncio.fixture(scope='function')
def client():
with TestClient(app) as client:
yield client
Первые три фикстуры необходимы для функционирования тестов. Первая фикстура создаёт собственное подключение к БД для тестов. Без этой фикстуры возникает конфликт циклов событий (но это не точно) и тесты падают с ошибкой RuntimeError: Event loop is closed. Ну раз есть свой engine, то надо и тестовую фабрику сессий (вторая фикстура). В ней никаких begin() и commit(), база данных не должна замусорится после тестов. Третья фикстура создает клиент, который позволяет отправлять HTTP-запросы. Клиент предоставляется самим фреймворком FastAPI Едем дальше. В ходе тестов мы будем создавать документы. Реализуем шаблон документа:
@pytest_asyncio.fixture(scope='function')
def base_document():
def _create_document(doc_number: int) -> dict:
return {
'content': f'Тестовый документ {doc_number}',
'embedding': [0.1 * doc_number, 0.2, 0.3, 0.4] * (settings.db.embedding_dim // 4),
'meta_data': {'author': f'tester{doc_number}', 'group': 'tests'}
}
return _create_document
Вложенная в фикстуру функция позволяет параметризировать наш документ при создании, а именно задать его номер. От номера зависит значение эмбеддинга и одно из полей для фильтрации. Всё это пригодится для тестирования фильтрации по метаданным и семантического поиска.
Теперь фикстура создания документа. Она также параметрирированная (для занания номера документа) и, фактически, копирует метод create репозитория. Фикстура использует тестовую сессию и коммитит изменения, чтобы API увидел запись в базе данных:
@pytest_asyncio.fixture(scope='function')
async def create_document(test_engine, base_document):
async def _create_document(doc_number: int) -> UUID:
async_session = async_sessionmaker(test_engine, expire_on_commit=False)
async with async_session() as session:
doc_data = DocumentCreate(**base_document(doc_number))
doc = Document(**normalize_embedding_format(doc_data.model_dump()))
session.add(doc)
await session.commit()
return doc.id
return _create_document
Аналогично с фикстурой удаления документа – на вход она получает ID документа, создаёт и выполняет запрос на удаление
@pytest_asyncio.fixture(scope='function')
async def delete_document(test_engine):
async def _delete_document(doc_id: UUID | str):
if isinstance(doc_id, str):
doc_id = UUID(doc_id)
async_session = async_sessionmaker(test_engine, expire_on_commit=False)
async with async_session() as session:
query = delete(Document).where(Document.id == doc_id)
await session.execute(query)
await session.commit()
return _delete_document
На основании этих двух фикстур можно реализовать ещё одну (на самом деле две), которая будет автоматически создавать документ, давать с ним работать и удалять его:
@pytest_asyncio.fixture(scope='function')
async def create_delete_one_document(create_document, delete_document):
doc_id = await create_document(1)
yield doc_id
await delete_document(doc_id)
Для поиска ещё есть фикстура create_delete_five_documents которая пять раз в цикле вызывает создание и удаление документа.
С фикструрами всё. Для примера приведу код одного из тестов (обновление документа через репозиторий):
@pytest.mark.asyncio
async def test_update_document(create_delete_one_document, test_session):
repo = DocumentRepository(test_session)
doc_id = create_delete_one_document
update_data = DocumentUpdate(content='Изменённый документ')
updated = await repo.update(doc_id, update_data)
assert updated.content == 'Изменённый документ'
assert updated.id == doc_id
Всё просто – у нас есть документ, созданный фикстурой. Мы берём его ID, создаём объект репозитория, обновляем одно из полей документа и проверяем, что изменения прошло. Единственный интересный момент в этом тесте – фикстуры должны располагаться именно в таком порядке. Иначе test_session через репозиторий блокирует доступ к нашему документу, фикстура не может его удалить и всё зависает.
Остальные тесты, если вдруг кому интересно, можно глянуть в репозитории (который Гит).
Упаковка в Докер
Вот мы и добрались до последнего пункта плана – упаковки в Докер. Для этого надо собрать контейнер с нашим сервисом, реализовать compose и не забыть про миграции.
Начнем с образа сервиса. Создадим в корне файл Dockerfile:
FROM python:3.11-alpine
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Что делаем:
-
собираем образ на базе alpine сборки Python 3.11
-
копируем файл с зависимостями и устанавливаем их (отдельно от кода, для создания отдельного Docker-слоя
-
копируем файлы проекта
-
запускаем uvicorn сервер. Ключ –host 0.0.0.0 необходим, чтобы сервер слушал все сетевые интерфейсы и был доступен извне контейнера (с хоста)
Далее необходимо доработать docker-compose.yaml файл, созданный в самом начале. Добавим в него наш сервис
api_service:
build: .
env_file:
- .env
environment:
- DB__HOST=postgres
ports:
- "${SERVICE__PORT}:8000"
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
Для сервиса собираем образ, передаём в контейнер переменные из .env, подменив DB__HOST на значение для контейнера, пробрасываем порт нашего сервиса наружу.
Ну и последний контейнер для нашего docker-compose.yaml – контейнер миграций. Всё, что он должен сделать – это дождаться загрузки базы данных, после чего запустить команду выполнения миграции и остановиться:
migration:
build: .
env_file:
- .env
environment:
- DB__HOST=postgres
command: >
sh -c "alembic upgrade head && echo 'Migrations completed'"
depends_on:
postgres:
condition: service_healthy
restart: "no"
Выполняем docker compose up -d иииии… Всё. Осталось проверить, что сервис теперь функционирует и в Докере.
Можно перейти по адресу http://127.0.0.1:8000/docs и выполнить GET запрос (кнопки Try it out -> Execute) или воспользоваться утилитой curl
curl -X 'GET'
'http://127.0.0.1:8000/api/v1/documents/?skip=0&limit=100'
-H 'accept: application/json'
В любом случае, так как база пустая, должны получить код 200 и пустой массив
Послесловие
Ну что, мы это сделали!
База поднята (Бобёр подтверждает), вектора создаются и читаются, тесты зеленые. Надеюсь эта писанина оказалась полезной и для вас. Тут, конечно, много чего не хватает – как минимум авторизации и индексации. Может как-нибудь допилю.
Исходный код проекта: тык
Буду рад обратной связи в комментариях. Спасибо за чтение! Пойду я пожалуй, отмечу завершение статьи… чаем.
Всем удачи и меньше багов!


