pgq. Организация монопольных очередей сообщений
Версия: 3.5.1.
В исходном дистрибутиве установлено по умолчанию: нет.
Связанные компоненты:
pgqd
.Схема размещения:
pgq
.
Описание
Модуль предназначен для организации очередей сообщений и состоит из непосредственно расширения pgq
для postgres и демона pgqd
.
При организации очередей следует избегать большого количества отдельных сообщений меньшего объема в пользу меньшего количества более объемных сообщений. Таблицы сообщений не содержат дополнительных индексов, позволяющих фильтровать условия видимости сообщений, и ориентированы, прежде всего, на скорость вставки. Компенсирующей мерой является штатное разделение таблицы очередей на набор наследуемых таблиц (INHERIT TABLE
), регламентные операции по каждой из которых разделены по времени. По умолчанию, количество наследуемых таблиц равно 3, может быть скорректировано пользователем до создания очереди.
Функции
Функция | Входные параметры | Возвращаемое значение | Описание |
---|---|---|---|
create_queue | text | int | Создает новую очередь с заданным именем |
drop_queue | text, bool | integer | Удаляет очередь и все связанные с ней таблицы |
drop_queue | text | integer | Удаляет очередь и все связанные с ней таблицы, при этом ни один из процессов не должен зависеть от очереди |
set_queue_config | text, text, text | integer | Задает конфигурацию для конкретной очереди (возвращает 0, если событие уже в очереди, в противном случае возвращает 1) |
insert_event | text, text, text | bigint | Добавляет событие в очередь |
insert_event | text, text, text, text, text, text, text | bigint | Добавляет в очередь событие со всеми дополнительными полями |
current_event_table | text | text | Возвращает активное событие таблицы для конкретной очереди. Событие может быть добавлено в нее через функцию, например, при вызове COPY |
register_consumer | text, text | Подписывает «потребителя» (consumer) на очередь, после чего он будет видеть все события в очереди | |
register_consumer | text, text, bigint | integer | Расширяет регистрацию возможностью задать tick_id |
register_consumer_at | text, text, bigint | integer | Используется для отложенной регистрации |
unregister_consumer | text, text | integer | Исключает «потребителя» (consumer) из очереди |
next_batch_info | text, text, int8, int8, int8, timestamptz, timestamptz, int8, int8 | int8 | Переходит к следующему элементу очереди, NULL при отсутствии |
next_batch | text, text | int8 | Возвращает идентификатор элемента очереди, NULL при отсутствии |
next_batch_custom | text, text, interval, int4, interval, int8, int8, int8, timestamptz, timestamptz, int8, int8 | int8 | Возвращает идентификатор элемента очереди |
get_batch_events | bigint, bigint, timestamptz, bigint, int4, text, text, text, text, text, text | setof record | Возвр ащает список всех элементов очереди |
get_batch_cursor | bigint, text, int4, text, bigint, timestamptz, bigint, int4, text, text, text, text, text, text | setof record | Возвращает список событий |
event_retry | bigint, bigint, timestamptz | integer | Помещает событие в очередь для повторного вызова в дальнейшем |
finish_batch | bigint | integer | Закрывает блок сообщений |
get_queue_info | text, integer, integer, interval, timestamptz, boolean, boolean, integer, interval, interval, interval, float8, bigint, bigint | setof record | Выводит информацию о всех очередях |
get_consumer_info | text, text, interval, bigint, bigint, bigint, bigint | setof record | Выводит информацию о «потребителях» (consumer) во всех очередях |
Доработка
Доработка не проводилась.
Ограничения
Ограничения отсутствуют.
Установка
Установка расширения (ОС «Альт» Astra Linux):
sudo apt-get install /usr/pangolin-6.3/3rdparty/pgq/pangolin-pgq-3.5.1-{OS}.x86_64.rpm -y
Для остальных ОС:
sudo dnf install /usr/pangolin-6.3/3rdparty/pgq/pangolin-pgq-3.5.1-{OS}.x86_64.rpm -y
Модуль считается «доверенным», поэтому его могут устанавливать пользователи, имеющие право CREATE
в текущей базе данных:
CREATE EXTENSION pgq;
Настройка
Настройка менеджера очередей pgqd
Менеджер очереди является системным сервисом, предоставляющим элементы синхронизации (tick).
При запуске сервиса происходит:
- подключение к сконфигурированной сущности;
- поиск установленного расширения по всем базам данным, доступным для подключения;
- вызов функции
pgq.ticker()
c заданной периодичностью; - вызов функций
pgq.maint_rotate_tables_step1()
,pgq.maint_rotate_tables_step2()
c заданной периодичностью; - вызов функции
pgq.maint_retry_events()
с заданной периодичностью.
Изменение конфигурации pgqd
производится с использованием конфигурационного файла pgqd.ini
:
[pgqd]
# where to log
logfile = ~/log/pgqd.log
# pidfile
pidfile = ~/pid/pgqd.pid
## optional parameters ##
# libpq connect string without dbname=
#base_connstr =
# startup db to query other databases
#initial_database = template1
# limit ticker to specific databases
#database_list = feat_pgq
# log into syslog
syslog = 1
syslog_ident = pgqd
## optional timeouts ##
# how often to check for new databases
check_period = 60
# how often to flush retry queue
retry_period = 30
# how often to do maintentance
#maint_period = 120
# how often to run ticker
ticker_period = 1
Описание конфигурационных параметров:
Конфигурационный параметр | Значение по умолчанию | Описание |
---|---|---|
logfile | ~/log/pgqd.log | Путь к log -файлу |
pidfile | ~/pid/pgqd.pid | Путь к pid -файлу |
base_connstr | - | Строка подключения к сущности (libpq syntax ). Не должна включать имя базы данных |
initial_database | template1 | База данных для автоматического подключения (при отсутствии параметра database_list ) |
database_list | - | Список баз данных для подключения. Если не задан - производится автоматический поиск |
syslog | 1 | Признак логирования действий в syslog |
syslog_ident | pgqd | Идентификатор syslog |
check_period | 60 | Периодичность поиска новых баз данных (в случае автоматического поиска), сек |
retry_period | 30 | Периодичность поиска сообщений, повторно добавляемых в очередь, сек |
maint_period | 120 | Периодичность проведения регламентных операций, сек |
ticker_period | 1 | Периодичность синхронизации очередей, сек |
Доступные параметры командной строки при использовании утилиты:
./pgqd --help
usage: pgq-ticker [switches] config.file
Switches:
-v Increase verbosity
-q No output to console
-d Daemonize
-h Show help
-V Show version
--ini Show sample config file
-s Stop - send SIGINT to running process
-k Kill - send SIGTERM to running process
-r Reload - send SIGHUP to running process
Для окончания конфигурирования сервиса требуется его перезагрузка (reload):
sudo systemctl reload pgqd
Настройка расширения pgq
-
Настройте количество таблиц очереди перед ее созданием, выполнив следующий запрос:
SET ROLE tuz_pgq_admin;
-- <n> - number of desired tables per queue
ALTER TABLE pgq.queue ALTER COLUMN queue_ntables SET DEFAULT <n>; -
Настройте остальные параметры очереди:
SET ROLE tuz_pgq_admin;
-- <queue_param> one of:
-- queue_ticker_max_count
-- queue_ticker_max_lag
-- queue_ticker_idle_period
-- queue_ticker_paused
-- queue_rotation_period
-- queue_external_ticker
SELECT * FROM pgq.set_queue_config('<queue_name>','<queue_param>','<param_value>'); -
Проверьте наполнение очереди (количество элементов, скорость наполнения, работоспособность расширения):
SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.get_queue_info(); -
Проверьте количество сообщений, ожидающих обработки:
SET ROLE tuz_pgq_admin;
SELECT * FROM get_consumer_info(['<queue_name>','<consumer_name>']); -
Настройте регламентные операции для таблиц очередей - добавьте дополнительные обработки для таблиц очередей:
SET ROLE tuz_pgq_admin;
UPDATE pgq.queue SET queue_extra_maint = ARRAY['<func1>','<func2>'] WHERE queue_name='<queue_name>';
Примечание:
Изменение типа данных для сообщений (на
json/b
илиxml
) не рекомендуется. Основная функция для добавления сообщенийpgq.insert_event_raw
написана на языке Си, скомпилирована в расширение и не предусматривает изменение состава полей.Изменение позиции повторной отправки сообщения (
event retry
) не предусмотрена. Повторно отправляемое сообщение помещается в конец очереди. Повторная приоритиза ция возможна созданием дополнительной очереди настройки приоритетов на уровне бизнес-логики.
Проверка работоспособности
При штатной работе менеджера очередей:
- Наблюдается увеличение значений поля
last_tick_id
. - Проводятся периодические подключения к БД, что может приводить к росту объема логов. После установки функциональности рекомендуется пересмотреть настройки аудита для пользователя, под которым будет производиться работа менеджера очередей.
Примечание:
При нештатной работе менеджера очередей наблюдаются одинаковые значение поля
last_tick_id
между итерациями, а так же возрастание значений в полеticker_lag
.