pgq. Организация монопольных очередей сообщений
Версия: 3.5.1.
В исходном дистрибутиве установлено по умолчанию: нет.
Связанные компоненты:
pangolin-pgqd
.Схема размещения:
pgq
.
Описание
Модуль предназначен для организации очередей сообщений и состоит из непосредственно расширения pgq
для postgres и демона pangolin-pgqd
.
Полезная нагрузка сообщения может содержать от 2 до 6 текстовых полей. Тип данных text
в данном случае выбран, как наиболее общий базовый тип.
При организации очередей следует избегать большого количества отдельных сообщений меньшего объема в пользу меньшего количества более объемных сообщений. Таблицы сообщений не содержат дополнительных индексов, позволяющих фильтровать условия видимости сообщений, и ориентированы, прежде всего, на скорость вставки. Компенсирующей мерой является штатное разделение таблицы очередей на набор наследуемых таблиц (INHERIT TABLE
), регламентные операции по каждой из которых разделены по времени. По умолчанию, количество наследуемых таблиц равно 3 (может быть скорректировано пользователем до создания очереди).
Доработка
Доработка не проводилась.
Ограничения
Ограничения отсутствуют.
Установка
- SberLinux, РЕД ОС, CentOS
- Astra Linux
- Альт СП
sudo dnf install pangolin-pgq-{version_component}-{OS}.x86_64.rpm
sudo apt install pangolin-pgq-{version_component}_amd64.deb
sudo apt-get install pangolin-pgq-{version_component}-{OS}.x86_64.rpm
Пример заполненной команды:
cd distributive
sudo apt-get -y install pangolin-pgq-3.5.1-altlinux10.x86_64.rpm
Модуль считается «доверенным», поэтому его могут устанавливать пользователи, имеющие право CREATE
в текущей базе данных:
CREATE EXTENSION pgq;
Настройка
Функции
Перечень функций расширения pgq
, используемых для работы с очередями сообщений:
Функции расширения pgq
Функция | Входные параметры | Выходные параметры | Описание |
|
| - | Создание очереди с заданным именем. Параметры функции:
|
|
| - | Удаление очереди с заданным именем. Параметры функции:
|
|
| - | Задание параметров очереди с заданным именем. Параметры функции:
Допустимые параметры:
Внимание! Количество таблиц в очереди невозможно изменить после создания очереди. Для изменения количества таблиц следует воспользоваться запросом (перед созданием очереди):
|
|
| - | Добавление нового сообщения в очередь. Параметры функции:
При использовании дополнительных параметров должны быть определены все 4 параметра, |
|
| - | Отображение текущей таблицы очереди, в которую будут добавляться записи. Параметры функции:
|
|
| - | Регистрация нового потребителя для очереди Параметры функции:
|
|
| - | Отложенная регистрация нового потребителя для очереди Параметры функции:
|
|
| - | Отмена регистрации потребителя для очереди Параметры функции:
|
|
|
| Получение информации о сеансе сообщений очереди Параметры функции:
|
|
| - | Устаревшая процедура получения информации о состоянии сообщений. Параметры функции:
|
|
|
| Расширенное получение информации о сеансе сообщений. Дополнительные параметры сеанса:
Параметры функции:
|
|
|
| Получение сообщений из сеанса сообщений. Параметры функции:
|
|
|
| Получение сообщений из сеанса сообщений с помощью курсора с заданным именем и, опционально, дополнительным условием. Параметры функции:
|
|
| - | Повторная постановка в очередь сообщения, с заданием интервала повторного получения (относительное или абсолютное время). Параметры функции:
|
|
| - | Повторная постановка всего сеанса сообщений в очередь по истечению интервала. Параметры функции:
|
|
| - | Окончание обработки сеанса сообщений. Параметры функции:
|
| - |
| Получение статистической информации об очередях. Параметры функции:
|
|
|
| Получение информации о состоянии потребителя: отставание, активность и очередность. Параметры функции:
|
| - | - | Получение версии расширения |
|
|
| Получение статистической информации о сеансе сообщений. Параметры функции:
|
|
| - | Формирование SQL-запроса для извлечения сообщений из сеанса. Параметры функции:
|
|
| - | Получение списка таблиц, где могут находиться сообщения сеанса. Параметры функции:
|
|
| - | Интерфейс ручного управления сообщениями. Параметры функции:
|
|
|
| Вспомогательная функция для расчета следующей позиции обработки сообщений. Параметры функции:
|
|
| - | Регистрация контрольной точки обработки сообщений. При использовании внешнего сервиса требуется установить параметр Переменные функции:
|
| - | - | Повторная постановка в очередь помеченных сообщений. Выполняется частично и должна вызываться до получения кода возврата 0. |
|
| - | Первый шаг обслуживания: очистка таблиц очереди, не содержащих необработанных сообщений. Выполняется с блокировкой. Параметры функции:
|
| - | - | Второй шаг обслуживания: обновление информации об очереди. |
| - | - | Формирование списка таблиц очереди, подлежащих вакуумированию. |
| - |
| Формирование списка регламентных операций для обслуживания очереди. Параметры функции:
|
|
| - | Назначение прав доступа на таблицы очереди для роли Параметры функции:
|
|
| - | Настройка параметров хранения для таблиц очереди: Параметры функции:
|
|
| - | Принудительная регистрация контрольной точки обработки сообщений. Параметры функции:
|
|
| - | Аналог Параметры функции:
|
|
| - | Аналог Параметры функции:
|
|
| - | Форматирование полного имени объекта схемы и таблицы (для совместимости). Параметры функции:
|
Триггеры
Одним из ключевых механизмов взаимодействия таблиц с очередями являются триггерные функции, автоматически добавляющие события в очередь при изменении данных в таблицах.
Триггерные функции в pgq
служат для регистрации изменений в таблицах (INSERT
, UPDATE
, DELETE
) и формирования событий, которые затем обрабатываются через очередь.
Перечень триггерных функций расширения pgq
, используемых для работы с очередями сообщений:
Триггерные функции pgq
Триггер | Примеры использования | Описание |
|
| Добавление сообщений в очередь, имя которой задается первым аргументом триггерной функции. Сообщение кодируется в формате |
|
| Добавление сообщений в очередь, имя которой задается первым аргументом триггерной функции. Сообщение кодируется в формате |
|
| Добавление сообщений в очередь, заданную первым параметром триггерной функции. Сообщение кодируется в формате форматированного SQL ( |
Триггеры pgq
можно настраивать с помощью аргументов, передаваемых в объявлении триггера:
Аргументы триггерных функций
Аргумент | Назначение |
| Пропуск выполнения операции. Поведение триггерной функции для |
| Исключение указанных полей из JSON |
| Явное указание полей, содержащих |
| Добавление предыдущего состояния строки ( |
| Переопределение значения одного из полей сообщения ( |
| Условие фильтрации: сообщение добавляется в очередь только если |
Сообщение, добавляемое в очередь с помощью триггерных функций имеет следующую структуру:
ev_type
— тип операции:I
(INSERT
),U
(UPDATE
),D
(DELETE
), с указанием первичного ключа через двоеточие:
;ev_data
— содержит значения полей таблицы в формате JSON;ev_extra1
— мя таблицы, из которой поступило сообщение;ev_extra2
— (опционально) значения полей до изменения (при наличииOLD.*
, используется с аргументомbackup
).
Таблицы
Расширение pgq
сопровождается рядом системных таблиц, которые обеспечивают работу этой механики:
Системные таблицы pgq
Таблица | Структура | Описание |
|
| Список зарегистрированных потребителей очередей (consumer) |
| Cтруктура таблицы
| Шаблон очереди сообщений. При создании очереди на его основе создаются таблицы событий. По умолчанию — 3 наследуемые таблицы. |
| Cтруктура таблицы
| Таблица зарегистрированных очередей сообщений |
| Cтруктура таблицы
| Таблица, содержащая информацию о получателях потребителей на очереди |
| Cтруктура таблицы
| Таблица, фиксирующая контрольные точки времени — метки для синхронизации обработки событий |
Роли
В расширении pgq
используются определенные роли, обеспечивающие контроль доступа к очередям, событиям и процессу их обработки:
Системные таблицы pgq
Роль | Описание |
| Администратор расширения. Член ролей |
| Роль потребителя (consumer). Позволяет создавать сеанс сообщений, читать и управлять сообщением |
| Роль отправителя очереди (provider). Позволяет помещать сообщения в очередь, в том числе с помощью триггеров |
Настройка менеджера очередей pangolin-pgqd
Менеджер очереди является системным сервисом, предоставляющим элементы синхронизации (tick).
При запуске сервиса происходит:
- подключение к сконфигурированной сущности;
- поиск установленного расширения по всем базам данным, доступным для подключения;
- вызов функции
pgq.ticker()
c заданной периодичностью; - вызов функций
pgq.maint_rotate_tables_step1()
,pgq.maint_rotate_tables_step2()
c заданной периодичностью; - вызов функции
pgq.maint_retry_events()
с заданной периодичностью.
Изменение конфигурации pangolin-pgqd
производится с использованием конфигурационного файла pangolin-pgqd.ini
:
[pgqd]
# where to log
logfile = ~/log/pangolin-pgqd.log
# pidfile
pidfile = ~/pangolin-pgqd/pangolin-pgqd.pid
## optional parameters ##
# libpq connect string without dbname=
#base_connstr = host=<IP-адрес> port=<Порт> user=<Имя пользователя> password=<Пароль>
# startup db to query other databases
#initial_database = template1
# limit ticker to specific databases
#database_list =
# log into syslog
syslog = 1
syslog_ident = pgqd
## optional timeouts ##
# how often to check for new databases
check_period = 60
# how often to flush retry queue
retry_period = 30
# how often to do maintentance
#maint_period = 120
# how often to run ticker
ticker_period = 1
Описание конфигурационных параметров:
Конфигурационные параметры
Конфигурационный параметр | Значение по умолчанию | Описание |
|
| Путь к лог-файлу |
|
| Путь к |
| - | Строка подключения к сущности ( |
|
| База данных для автоматического подключения (при отсутствии параметра |
| - | Список баз данных для подключения. Если не задан — производится автоматический поиск |
| 1 | Признак логирования действий в |
|
| Идентификатор |
| 60 | Периодичность поиска новых баз данных (в случае автоматического поиска), секунды |
| 30 | Периодичность поиска сообщений, повторно добавляемых в очередь, секунды |
| 120 | Периодичность проведения регламентных операций, секунды |
| 1 | Периодичность синхронизации очередей, секунды |
Доступные параметры командной строки при использовании расширения:
./pgqd --help
usage: pgq-ticker [switches] config.file
Switches:
-v Increase verbosity
-q No output to console
-d Daemonize
-h Show help
-V Show version
--ini Show sample config file
-s Stop - send SIGINT to running process
-k Kill - send SIGTERM to running process
-r Reload - send SIGHUP to running process
Где:
-v
— повышение уровня подробности вывода;-q
— отключение вывода сообщений в консоль (режим тишины);-d
— запуск в фоновом режиме;-h
— отображение справки по использованию;-V
— отображение версии;--ini
— отображение примера конфигурационного файла;-s
— остановка выполнения — отправка сигналаSIGINT
запущенному процессу;-k
— завершение работы — отправка сигналаSIGTERM
запущенному процессу;-r
— перезагрузка конфигурации — отправка сигналаSIGHUP
запущенному процессу.
Для окончания конфигурирования сервиса требуется его перезагрузка:
sudo systemctl reload pangolin-pgqd
Настройка расширения pgq
-
Настройте количество таблиц очереди перед ее созданием, выполнив следующий запрос:
SET ROLE tuz_pgq_admin;
-- <n> - number of desired tables per queue
ALTER TABLE pgq.queue ALTER COLUMN queue_ntables SET DEFAULT <n>; -
Настройте остальные параметры очереди:
SET ROLE tuz_pgq_admin;
-- <queue_param> one of:
-- queue_ticker_max_count
-- queue_ticker_max_lag
-- queue_ticker_idle_period
-- queue_ticker_paused
-- queue_rotation_period
-- queue_external_ticker
SELECT * FROM pgq.set_queue_config('<queue_name>','<queue_param>','<param_value>'); -
Проверьте наполнение очереди (количество элементов, скорость наполнения, работоспособность расширения):
SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.get_queue_info(); -
Проверьте количество сообщений, ожидающих обработки:
SET ROLE tuz_pgq_admin;
SELECT * FROM get_consumer_info(['<queue_name>','<consumer_name>']); -
Настройте регламентные операции для таблиц очередей - добавьте дополнительные обработки для таблиц очередей:
SET ROLE tuz_pgq_admin;
UPDATE pgq.queue SET queue_extra_maint = ARRAY['<func1>','<func2>'] WHERE queue_name='<queue_name>';
Изменение типа данных для сообщений (на json/b
или xml
) не рекомендуется. Основная функция для добавления сообщений pgq.insert_event_raw
написана на языке C, скомпилирована в расширение и не предусматривает изменение состава полей.
Изменение позиции повторной отправки сообщения (event retry
) не предусмотрена. Повторно отправляемое сообщение помещается в конец очереди.
Повторная приоритизация возможна созданием дополнительной очереди настройки приоритетов на уровне бизнес-логики.
Проверка работоспособности
При штатной работе менеджера очередей:
- наблюдается увеличение значений поля
last_tick_id
; - проводятся периодические подключения к БД, что может приводить к росту объема логов. После установки функциональности рекомендуется пересмотреть настройки аудита для пользователя, под которым будет производиться работа менеджера очередей.
При нештатной работе менеджера очередей наблюдаются одинаковые значение поля last_tick_id
между итерациями, а так же возрастание значений в поле ticker_lag
.
Использование модуля
Обозначения переменных
Далее в примерах имеются следующие обозначения:
<name_queue>
- наименование очереди;<type_event>
- тип события;<event_information>
- информация о событии;<name_consumer>
- наименование потребителя;<id_event>
- идентификатор события;<id_batch>
- идентификатор последовательности;<second_until_next_attempt>
- количество секунд до повторной попытки выполнить действие.
Установка и проверка расширения pgq в СУБД Pangolin
-
Создайте базу данных
test_pgq
и подключитесь к ней:CREATE DATABASE test_pgq;
\c test_pgq -
Создайте расширение
pgq
:CREATE EXTENSION pgq;
-
Проверьте наличие базы данных
test_pgq
и расширенияpgq
на ведущем сервере:\dx pgq
Результатом выполнения — вывод версии расширения, например:
List of installed extensions
┌──────┬─────────┬────────────┬──────────────────────────────┐
│ Name │ Version │ Schema │ Description │
├──────┼─────────┼────────────┼──────────────────────────────┤
│ pgq │ 3.5.1 │ pg_catalog │ Generic queue for PostgreSQL │
└──────┴─────────┴────────────┴──────────────────────────────┘
(1 row)
Установка и запуск менеджера очередей pangolin-pgqd
-
Откройте конфигурационный файл
pangolin-pgqd.ini
, убедитесь в наличии параметраlogfile
и закомментированного параметраdatabase_list
. -
Перезапустите менеджер соединений
pangolin-pgqd
, убедитесь в его запуске с помощью команд:sudo systemctl restart pangolin-pgqd;
sudo systemctl status pangolin-pgqd; -
Проверьте автоматическое определение БД со стороны менеджера подключений
pangolin-pgqd
на ведущем сервере:sudo journalctl -n10 --no-pager -eu pangolin-pgqd
Результат выполнения команды содержит строку
LOG [test_pgq]: pgq version ok: {component_version}
. -
Проверьте автоматическое определение БД со стороны менеджера подключений
pangolin-pgqd
на ведомом сервере:sudo journalctl -n10 --no-pager -eu pangolin-pgqd
Результат выполнения команды содержит строки:
ERROR test_pgq: ERROR: cannot execute LOCK TABLE during recovery
ERROR: cannot execute pg_current_xact_id() during recovery
Работа с очередью от роли потребителя
-
Создание очереди:
SELECT * FROM pgq.create_queue(<name_queue> TEXT);
-
Добавление в очередь сообщения:
SELECT * FROM pgq.insert_event(<name_queue> TEXT, <type_event> TEXT, <event_information> TEXT);
-
Создание потребителя (consumer):
SELECT * FROM pgq.register_consumer(<name_queue> TEXT, <name_consumer> TEXT);
Внимание!Потребитель будет получать события только после собственной регистрации, поэтому возможность создавать события требует первичной «регистрации» потребителя.
-
Получение идентификатора (
ID
) блока последовательных сообщений в очереди:SELECT * FROM pgq.next_batch(<name_queue> TEXT, <name_consumer> TEXT);
-
Получение всех событий блока последовательных сообщений:
SELECT * FROM pgq.get_batch_events(<id_batch> BIGINT);
ПримечаниеВозвращенное значение может быть пустым.
-
Повторное помещение события в очередь:
SELECT * FROM pgq.event_retry(<id_batch> BIGINT, <id_event> BIGINT, <second_until_next_attempt> INTEGER);
Работа с очередью от роли отправителя
-
На ведущем сервере создайте пользователей для работы с ролевой моделью
pgq
:SET ROLE db_admin;
\c test_pgq
CREATE USER tuz_pgq_admin IN ROLE pgq_admin;
CREATE USER tuz_pgq_reader IN ROLE pgq_reader;
CREATE USER tuz_pgq_writer IN ROLE pgq_writer; -
Создайте очередь сообщений
test_q
:SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.create_queue('test_q'); -
Зарегистрируйте получателя сообщений:
SET ROLE tuz_pgq_reader;
SELECT pgq.register_consumer('test_q','test_consumer'); -
Заполните очередь 10 раз наборами из 10000 сообщений:
SET ROLE tuz_pgq_writer;
DO
$$
DECLARE
i int4;
BEGIN
FOR i IN 1..10 LOOP
PERFORM pgq.insert_event('test_q',id::text,format('MSG %1$s',id)) from generate_series(1,1000) id;
END LOOP;
END
$$;
\watch 0 -
Получите статистическую информацию по очереди и потребителям:
SET ROLE tuz_pgq_reader;
SELECT * FROM pgq.get_queue_info()\gx
SELECT * FROM pgq.get_consumer_info()\gxРезультат выполнения команды — получение таблиц вида:
SELECT * FROM pgq.get_queue_info()\gx
┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
│ queue_name │ test_q │
│ queue_ntables │ 3 │
│ queue_cur_table │ 0 │
│ queue_rotation_period │ 02:00:00 │
│ queue_switch_time │ 2023-11-16 12:57:13.566682+03 │
│ queue_external_ticker │ f │
│ queue_ticker_paused │ f │
│ queue_ticker_max_count │ 500 │
│ queue_ticker_max_lag │ 00:00:03 │
│ queue_ticker_idle_period │ 00:01:00 │
│ ticker_lag │ 00:00:04.742343 │
│ ev_per_sec │ 770.3491295563064 │
│ ev_new │ 0 │
│ last_tick_id │ 53 │
└──────────────────────────┴───────────────────────────────┘
SELECT * FROM pgq.get_consumer_info()\gx
┌─[ RECORD 1 ]───┬─────────────────┐
│ queue_name │ test_q │
│ consumer_name │ test_consumer │
│ lag │ 00:02:09.63673 │
│ last_seen │ 00:01:56.731471 │
│ last_tick │ 36 │
│ current_batch │ NULL │
│ next_tick │ NULL │
│ pending_events │ 125050 │
└────────────────┴─────────────────┘ПримечаниеЗначения контрольных точек времени (tick) могут изменяться.
-
Пример автоматизированного конвейера получения и обработки сеансов:
SET ROLE tuz_pgq_reader;
DO
$$
DECLARE
v_batch_id int8;
v_event_cnt int8;
BEGIN
SELECT batch_id FROM pgq.next_batch_info('test_q','test_consumer') INTO v_batch_id;
IF v_batch_id ISNULL THEN
RETURN;
END IF;
SELECT count(*) FROM pgq.get_batch_events(v_batch_id) INTO v_event_cnt;
RAISE NOTICE 'Got % events from batch %', v_event_cnt::text,v_batch_id::TEXT;
-- re-queue random 10 events at 5 secs;
IF v_event_cnt > 1 THEN
PERFORM pgq.event_retry(v_batch_id,ev_id,5) FROM pgq.get_batch_events(v_batch_id) ORDER BY random() LIMIT 10;
END IF;
PERFORM pgq.finish_batch(v_batch_id);
END;
$$;
\watch 2При успешном результате наблюдается последовательное увеличение
batch_id
и периодическое получение сообщений до исчерпания очереди. -
Получите статистическую информацию по очереди и потребителям:
SET ROLE tuz_pgq_reader;
SELECT * FROM pgq.get_queue_info()\gx
SELECT * FROM pgq.get_consumer_info()\gxРезультат выполнения команды — получение таблиц вида:
SELECT * FROM pgq.get_queue_info()\gx
┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
│ queue_name │ test_q │
│ queue_ntables │ 3 │
│ queue_cur_table │ 0 │
│ queue_rotation_period │ 02:00:00 │
│ queue_switch_time │ 2023-11-16 12:57:13.566682+03 │
│ queue_external_ticker │ f │
│ queue_ticker_paused │ f │
│ queue_ticker_max_count │ 500 │
│ queue_ticker_max_lag │ 00:00:03 │
│ queue_ticker_idle_period │ 00:01:00 │
│ ticker_lag │ 00:00:28.175869 │
│ ev_per_sec │ 568.1271702331466 │
│ ev_new │ 0 │
│ last_tick_id │ 63 │
└──────────────────────────┴───────────────────────────────┘
SELECT * FROM pgq.get_consumer_info()\gx
┌─[ RECORD 1 ]───┬─────────────────┐
│ queue_name │ test_q │
│ consumer_name │ test_consumer │
│ lag │ 00:01:16.866497 │
│ last_seen │ 00:01:16.028043 │
│ last_tick │ 61 │
│ current_batch │ NULL │
│ next_tick │ NULL │
│ pending_events │ 20 │
└────────────────┴─────────────────┘При успешном результате наблюдается увеличение
last_tick_id
, уменьшение размера очереди (pending_events
). -
Удалите очередь:
SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.drop_queue(x_queue_name =>'test_q', x_force => TRUE);Результат выполнения команды — получение таблиц вида:
┌────────────┐
│ drop_queue │
├────────────┤
│ 1 │
└────────────┘
Диагностика ошибок
В этом разделе описана информация, с помощью которой можно диагностировать неполадки.
Менеджер очередей
Проверка менеджера очередей на уровне SQL выполняется аналогично контролю наполнения очереди. В нормальной ситуации:
- поле
last_tick_id
регулярно увеличивается — это признак активности менеджера очередей и их обработки; - поле
ticker_lag
остается в пределах допустимого времени задержки.
При сбоях:
- значение
last_tick_id
не изменяется между итерациями; - значение
ticker_lag
постепенно растет, что указывает на отсутствие новых событий от менеджера очередей.
При нормальной работе менеджера очередей наблюдаются периодические подключения к БД. Это может вызвать рост объема логов при включенном аудите. Рекомендуется пересмотреть параметры аудита для роли, от имени которой работает менеджер очередей.
Очереди сообщений
Иногда требуется временно приостановить обработку очереди, например, при выполнении обслуживания или отладки. Это можно сделать вручную через вызовы SQL-функций.
Приостановление очереди на прием:
SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','true');
Возобновление работы очереди сообщений:
SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','false');
Ссылки на документацию разработчика
Функции расширения pgq
: https://pgq.github.io/extension/pgq/files/external-sql.html.