Когда речь заходит об очередях сообщений, на ум сразу приходят гиганты вроде RabbitMQ, Apache Kafka или AWS SQS. Но что делать, когда нужна простая, легковесная и кастомизируемая очередь для внутренних задач одного сервиса, а разворачивать и поддерживать тяжеловесный брокер нецелесообразно? Наша команда столкнулась с такой задачей при разработке модуля обработки аудио-файлов. Нам нужна была очередь для управления заданиями на конвертацию, но с полным контролем над логикой повторов, приоритетами и мониторингом. Решение — создать свой минималистичный брокер на Node.js. Этот кейс — история о том, как мы спроектировали и реализовали его с нуля.
Требования были специфичны: 1) Асинхронная обработка заданий (jobs) из очереди. 2) Гарантия доставки (at-least-once delivery). 3) Возможность установки приоритета заданию. 4) Механизм повторных попыток (retry) с экспоненциальной задержкой (exponential backoff) в случае ошибки. 5) Простой веб-интерфейс для мониторинга состояния очереди (задания в ожидании, в процессе, завершенные, неудачные). 6) Хранение состояния между перезапусками сервиса. Использовать Redis как очередь (через Bull или аналоги) мы не могли из-за ограничений инфраструктуры. Полноценный RabbitMQ был избыточен. Поэтому мы решили написать свое решение, используя файловую систему как хранилище и Node.js с его event-driven архитектурой как движок.
**Архитектура.** Мы выделили три основных компонента: **Queue (очередь)**, **Worker (воркер)** и **Dashboard (дашборд)**. Queue отвечает за прием заданий, их хранение и выдачу воркерам. Worker — это вычислительный процесс, который берет задание из очереди, выполняет его (в нашем случае — вызывал внешний FFmpeg-процесс для конвертации аудио) и сообщает о результате. Dashboard — простой HTTP-сервер, отдающий статус очереди в реальном времени через Server-Sent Events (SSE).
**Реализация ядра очереди.** Сердцем системы стал класс `FileSystemQueue`. Мы отказались от хранения всего в памяти ради persistence. Каждое задание (job) представляло собой JSON-файл в директории `./queue/pending`. Структура файла: `id`, `type`, `payload` (данные задания), `priority`, `attempts` (количество попыток), `createdAt`. Приоритет реализовали через именование поддиректорий: `./queue/pending/high`, `./queue/pending/normal`. Воркер, запрашивая задание, сначала сканировал директорию `high`, затем `normal`. Для гарантии доставки и конкурентного доступа использовали файловые блокировки (пакет `proper-lockfile`). Когда воркер забирал задание, он перемещал файл в `./queue/active`, создавая lock-файл. После обработки — в `./queue/completed` или `./queue/failed`.
**Воркер и логика повторов.** Воркер был реализован как отдельный Node.js-процесс, запускаемый в нескольких экземплярах (через `cluster` модуль для использования всех ядер CPU). Его алгоритм: 1) Опрос очереди на наличие заданий (интервал 1 секунда). 2) Выбор задания с наивысшим приоритетом. 3) Попытка захвата (lock) и перемещение в `active`. 4) Выполнение бизнес-логики. 5) В случае успеха — перемещение в `completed`. В случае ошибки — увеличение счетчика `attempts`. Если `attempts` меньше максимального (например, 3), задание с вычисленной задержкой (`Math.pow(2, attempts) * 1000 мс`) перемещалось обратно в `pending`. Если попытки исчерпаны — в `failed` с записью ошибки.
**Взаимодействие и API.** Для добавления заданий в очередь мы создали простой REST endpoint `POST /jobs`. Он принимал тип задания и payload, генерировал ID, создавал JSON-файл в соответствующей директории `pending` и немедленно отвечал клиенту `{ jobId: '...', status: 'queued' }`. Это обеспечивало асинхронность. Для получения статуса задания — endpoint `GET /jobs/:id`, который искал файл по всем директориям (`pending`, `active`, `completed`, `failed`) и возвращал его содержимое.
**Дашборд и мониторинг.** Простой интерфейс на Express и EJS отображал количество файлов в каждой директории, что давало представление о размере очереди, нагрузке на воркеры и количестве ошибок. Мы добавили Server-Sent Events (SSE), чтобы данные на странице обновлялись в реальном времени без опросов. Для этого при любом изменении в директориях очереди (через `chokidar` — наблюдатель за файлами) сервер отправлял клиенту событие с новыми цифрами.
**Сложности и решения.** Главной проблемой стала **надежность файловой системы как брокера**. Атомарность операций «взять задание» достигалась только через lock-файлы. Мы добавили механизм «здоровья» (health check) для воркеров: если воркер умирал, его lock-файлы в `active` оставались, блокируя задания. Реализовали cleanup-скрипт, который по крону проверял `active`-задания старше N минут и возвращал их обратно в `pending`, увеличивая счетчик попыток. Вторая проблема — **производительность при большом количестве файлов**. Сканирование тысяч файлов при каждом опросе стало узким местом. Мы ввели простую индексацию в памяти: при старте очередь считывала список файлов в объект Map, а все изменения (добавление, перемещение) синхронизировали и этот Map. Файловая система оставалась источником истины, а Map — быстрым кэшем.
**Итоги.** Наш легковесный брокер успешно обрабатывал до 1000 заданий в час, что полностью покрывало потребности модуля. Мы получили полный контроль над логикой, минимальные зависимости (только Node.js) и глубокое понимание принципов работы очередей. Ключевые выводы: 1) Иногда «велосипед» имеет смысл, если требования узки и специфичны. 2) Файловая система может быть viable storage для простых очередей с moderate нагрузкой. 3) Самая сложная часть — не обработка, а обеспечение надежности и консистентности в конкурентной среде. Для более высоких нагрузок мы бы рассмотрели embedded БД типа SQLite или LevelDB. Этот опыт стал отличной школой и позволил нам в дальнейшем более осознанно работать с промышленными брокерами, понимая, что скрывается за их абстракциями.
Строим очередь сообщений с нуля: кейс разработки легковесного брокера на Node.js
Детальный технический кейс создания простой, но надежной системы очередей сообщений (job queue) с нуля на Node.js с использованием файловой системы для хранения, реализацией приоритетов, повторных попыток и веб-дашборда.
231
2
Комментарии (8)