Перейти к основному содержимому

pgq. Организация монопольных очередей сообщений

Версия: 3.5.1.

В исходном дистрибутиве установлено по умолчанию: нет.

Связанные компоненты: pgqd.

Схема размещения: pgq.

Описание

Модуль предназначен для организации очередей сообщений и состоит из непосредственно расширения pgq для postgres и демона pgqd.

При организации очередей следует избегать большого количества отдельных сообщений меньшего объема в пользу меньшего количества более объемных сообщений. Таблицы сообщений не содержат дополнительных индексов, позволяющих фильтровать условия видимости сообщений, и ориентированы, прежде всего, на скорость вставки. Компенсирующей мерой является штатное разделение таблицы очередей на набор наследуемых таблиц (INHERIT TABLE), регламентные операции по каждой из которых разделены по времени. По умолчанию, количество наследуемых таблиц равно 3, может быть скорректировано пользователем до создания очереди.

Функции

ФункцияВходные параметрыВозвращаемое значениеОписание
create_queuetextintСоздает новую очередь с заданным именем
drop_queuetext, boolintegerУдаляет очередь и все связанные с ней таблицы
drop_queuetextintegerУдаляет очередь и все связанные с ней таблицы, при этом ни один из процессов не должен зависеть от очереди
set_queue_configtext, text, textintegerЗадает конфигурацию для конкретной очереди (возвращает 0, если событие уже в очереди, в противном случае возвращает 1)
insert_eventtext, text, textbigintДобавляет событие в очередь
insert_eventtext, text, text, text, text, text, textbigintДобавляет в очередь событие со всеми дополнительными полями
current_event_tabletexttextВозвращает активное событие таблицы для конкретной очереди. Событие может быть добавлено в нее через функцию, например, при вызове COPY
register_consumertext, textПодписывает «потребителя» (consumer) на очередь, после чего он будет видеть все события в очереди
register_consumertext, text, bigintintegerРасширяет регистрацию возможностью задать tick_id
register_consumer_attext, text, bigintintegerИспользуется для отложенной регистрации
unregister_consumertext, textintegerИсключает «потребителя» (consumer) из очереди
next_batch_infotext, text, int8, int8, int8, timestamptz, timestamptz, int8, int8int8Переходит к следующему элементу очереди, NULL при отсутствии
next_batchtext, textint8Возвращает идентификатор элемента очереди, NULL при отсутствии
next_batch_customtext, text, interval, int4, interval, int8, int8, int8, timestamptz, timestamptz, int8, int8int8Возвращает идентификатор элемента очереди
get_batch_eventsbigint, bigint, timestamptz, bigint, int4, text, text, text, text, text, textsetof recordВозвращает список всех элементов очереди
get_batch_cursorbigint, text, int4, text, bigint, timestamptz, bigint, int4, text, text, text, text, text, textsetof recordВозвращает список событий
event_retrybigint, bigint, timestamptzintegerПомещает событие в очередь для повторного вызова в дальнейшем
finish_batchbigintintegerЗакрывает блок сообщений
get_queue_infotext, integer, integer, interval, timestamptz, boolean, boolean, integer, interval, interval, interval, float8, bigint, bigintsetof recordВыводит информацию о всех очередях
get_consumer_infotext, text, interval, bigint, bigint, bigint, bigintsetof recordВыводит информацию о «потребителях» (consumer) во всех очередях

Доработка

Доработка не проводилась.

Ограничения

Ограничения отсутствуют.

Установка

Установка расширения (ОС «Альт» Astra Linux):

sudo apt-get install /usr/pangolin-6.3/3rdparty/pgq/pangolin-pgq-3.5.1-{OS}.x86_64.rpm -y

Для остальных ОС:

sudo dnf install /usr/pangolin-6.3/3rdparty/pgq/pangolin-pgq-3.5.1-{OS}.x86_64.rpm -y

Модуль считается «доверенным», поэтому его могут устанавливать пользователи, имеющие право CREATE в текущей базе данных:

CREATE EXTENSION pgq;

Настройка

Настройка менеджера очередей pgqd

Менеджер очереди является системным сервисом, предоставляющим элементы синхронизации (tick).

При запуске сервиса происходит:

  • подключение к сконфигурированной сущности;
  • поиск установленного расширения по всем базам данным, доступным для подключения;
  • вызов функции pgq.ticker() c заданной периодичностью;
  • вызов функций pgq.maint_rotate_tables_step1(), pgq.maint_rotate_tables_step2() c заданной периодичностью;
  • вызов функции pgq.maint_retry_events() с заданной периодичностью.

Изменение конфигурации pgqd производится с использованием конфигурационного файла pgqd.ini:

[pgqd]
# where to log
logfile = ~/log/pgqd.log
# pidfile
pidfile = ~/pid/pgqd.pid

## optional parameters ##
# libpq connect string without dbname=
#base_connstr =
# startup db to query other databases
#initial_database = template1
# limit ticker to specific databases
#database_list = feat_pgq
# log into syslog
syslog = 1
syslog_ident = pgqd

## optional timeouts ##
# how often to check for new databases
check_period = 60
# how often to flush retry queue
retry_period = 30
# how often to do maintentance
#maint_period = 120
# how often to run ticker
ticker_period = 1

Описание конфигурационных параметров:

Конфигурационный параметрЗначение по умолчаниюОписание
logfile~/log/pgqd.logПуть к log-файлу
pidfile~/pid/pgqd.pidПуть к pid-файлу
base_connstr-Строка подключения к сущности (libpq syntax). Не должна включать имя базы данных
initial_databasetemplate1База данных для автоматического подключения (при отсутствии параметра database_list)
database_list-Список баз данных для подключения. Если не задан - производится автоматический поиск
syslog1Признак логирования действий в syslog
syslog_identpgqdИдентификатор syslog
check_period60Периодичность поиска новых баз данных (в случае автоматического поиска), сек
retry_period30Периодичность поиска сообщений, повторно добавляемых в очередь, сек
maint_period120Периодичность проведения регламентных операций, сек
ticker_period1Периодичность синхронизации очередей, сек

Доступные параметры командной строки при использовании утилиты:

./pgqd --help
usage: pgq-ticker [switches] config.file
Switches:
-v Increase verbosity
-q No output to console
-d Daemonize
-h Show help
-V Show version
--ini Show sample config file
-s Stop - send SIGINT to running process
-k Kill - send SIGTERM to running process
-r Reload - send SIGHUP to running process

Для окончания конфигурирования сервиса требуется его перезагрузка (reload):

sudo systemctl reload pgqd

Настройка расширения pgq

  1. Настройте количество таблиц очереди перед ее созданием, выполнив следующий запрос:

    SET ROLE tuz_pgq_admin;
    -- <n> - number of desired tables per queue
    ALTER TABLE pgq.queue ALTER COLUMN queue_ntables SET DEFAULT <n>;
  2. Настройте остальные параметры очереди:

    SET ROLE tuz_pgq_admin;
    -- <queue_param> one of:
    -- queue_ticker_max_count
    -- queue_ticker_max_lag
    -- queue_ticker_idle_period
    -- queue_ticker_paused
    -- queue_rotation_period
    -- queue_external_ticker
    SELECT * FROM pgq.set_queue_config('<queue_name>','<queue_param>','<param_value>');
  3. Проверьте наполнение очереди (количество элементов, скорость наполнения, работоспособность расширения):

    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.get_queue_info();
  4. Проверьте количество сообщений, ожидающих обработки:

    SET ROLE tuz_pgq_admin;
    SELECT * FROM get_consumer_info(['<queue_name>','<consumer_name>']);
  5. Настройте регламентные операции для таблиц очередей - добавьте дополнительные обработки для таблиц очередей:

    SET ROLE tuz_pgq_admin;
    UPDATE pgq.queue SET queue_extra_maint = ARRAY['<func1>','<func2>'] WHERE queue_name='<queue_name>';

Примечание:

Изменение типа данных для сообщений (на json/b или xml) не рекомендуется. Основная функция для добавления сообщений pgq.insert_event_raw написана на языке Си, скомпилирована в расширение и не предусматривает изменение состава полей.

Изменение позиции повторной отправки сообщения (event retry) не предусмотрена. Повторно отправляемое сообщение помещается в конец очереди. Повторная приоритизация возможна созданием дополнительной очереди настройки приоритетов на уровне бизнес-логики.

Проверка работоспособности

При штатной работе менеджера очередей:

  • Наблюдается увеличение значений поля last_tick_id.
  • Проводятся периодические подключения к БД, что может приводить к росту объема логов. После установки функциональности рекомендуется пересмотреть настройки аудита для пользователя, под которым будет производиться работа менеджера очередей.

Примечание:

При нештатной работе менеджера очередей наблюдаются одинаковые значение поля last_tick_id между итерациями, а так же возрастание значений в поле ticker_lag.

Использование модуля

Обозначения переменных

Далее в примерах имеются следующие обозначения:

  • <name_queue> - наименование очереди;
  • <type_event> - тип события;
  • <event_information> - информация о событии;
  • <name_consumer> - наименование потребителя;
  • <id_event> - идентификатор события;
  • <id_batch> - идентификатор последовательности;
  • <second_until_next_attempt> - количество секунд до повторной попытки выполнить действие.

Создание очереди:

 select * from pgq.create_queue(<name_queue> text);

Добавление в очередь сообщения:

select * from pgq.insert_event(<name_queue> text, <type_event> text, <event_information> text);

Создание «потребителя» (consumer):

select * from pgq.register_consumer(<name_queue> text, <name_consumer> text);

Внимание!

Потребитель будет получать события только после собственной регистрации, поэтому возможность создавать события требует первичной «регистрации» потребителя.

Получение идентификатора (ID) блока последовательных сообщений в очереди:

select * from pgq.next_batch(<name_queue> text, <name_consumer> text);

Получение всех событий блока последовательных сообщений:

select * from pgq.get_batch_events(<id_batch> bigint);

Примечание:

Возвращенное значение может быть пустым.

Повторное помещение события в очередь:

select * from pgq.event_retry(<id_batch> bigint, <id_event> bigint, <second_until_next_attempt> integer);

Приостановление очереди на прием:

SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','true');

Возобновление работы очереди сообщений:

SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','false');

Ссылки на документацию разработчика

Функции расширения pgq: https://pgq.github.io/extension/pgq/files/external-sql.html.