Перейти к основному содержимому

pgq. Организация монопольных очередей сообщений

Версия: 3.5.1.

В исходном дистрибутиве установлено по умолчанию: нет.

Связанные компоненты: pangolin-pgqd.

Схема размещения: pgq.

Описание

Модуль предназначен для организации очередей сообщений и состоит из непосредственно расширения pgq для postgres и демона pangolin-pgqd.

Полезная нагрузка сообщения может содержать от 2 до 6 текстовых полей. Тип данных text в данном случае выбран, как наиболее общий базовый тип.

При организации очередей следует избегать большого количества отдельных сообщений меньшего объема в пользу меньшего количества более объемных сообщений. Таблицы сообщений не содержат дополнительных индексов, позволяющих фильтровать условия видимости сообщений, и ориентированы, прежде всего, на скорость вставки. Компенсирующей мерой является штатное разделение таблицы очередей на набор наследуемых таблиц (INHERIT TABLE), регламентные операции по каждой из которых разделены по времени. По умолчанию, количество наследуемых таблиц равно 3 (может быть скорректировано пользователем до создания очереди).

Доработка

Доработка не проводилась.

Ограничения

Ограничения отсутствуют.

Установка

sudo dnf install pangolin-pgq-{version_component}-{OS}.x86_64.rpm
Подсказка

Пример заполненной команды:

cd distributive

sudo apt-get -y install pangolin-pgq-3.5.1-rhel8.7.x86_64.rpm

Модуль считается «доверенным», поэтому его могут устанавливать пользователи, имеющие право CREATE в текущей базе данных:

CREATE EXTENSION pgq;

Настройка

Функции

Перечень функций расширения pgq, используемых для работы с очередями сообщений:

:

Функции расширения pgq

Функция
Входные параметры
Выходные параметры
Описание
pgq.create_queue
queue_name (TEXT)
-
Создание очереди с заданным именем.

Параметры функции:

- queue_name — имя создаваемой очереди
pgq.drop_queue
queue_name (TEXT)
force (BOOL)
-
Удаление очереди с заданным именем.

Параметры функции:

- queue_name — имя очереди;
- Если force установлено в true, то удаление происходит принудительно (даже в случае, если очередь содержит события)
pgq.set_queue_config
queue_name (TEXT)
param_name (TEXT)
param_value (TEXT)
-
Задание параметров очереди с заданным именем.

Параметры функции:

- queue_name — имя очереди;
- param_name — ключ конфигурации;
- param_value — новое значение

Допустимые параметры:
- queue_ticker_max_count
- queue_ticker_max_lag
- queue_ticker_idle_period
- queue_ticker_paused
- queue_rotation_period
- queue_external_ticker

attention Количество таблиц в очереди невозможно изменить после создания очереди.

Для изменения количества таблиц следует воспользоваться запросом (перед созданием очереди):

ALTER TABLE pgq.queue ALTER COLUMN queue_ntables SET DEFAULT <n>;

:::

    • pgq.insert_event

    • queue_name (TEXT)
      ev_type (TEXT)
      ev_data (TEXT)
      ev_extra1 (TEXT)
      ev_extra2 (TEXT)
      ev_extra3 (TEXT)
      ev_extra4 (TEXT)

    • -

    • Добавление нового сообщения в очередь.

      Параметры функции:

      • queue_name — имя очереди;
      • ev_type — тип сообщения;
      • ev_data — данные сообщения;
      • ev_extra1..ev_extra4 — дополнительная информация

      При использовании дополнительных параметров должны быть определены все 4 параметра, NULL допускается

    • pgq.current_event_table

    • queue_name (TEXT)

    • -

    • Отображение текущей таблицы очереди, в которую будут добавляться записи.

      Параметры функции:

      • queue_name — имя очереди
    • pgq.register_consumer

    • queue_name (TEXT)
      consumer_id (TEXT)

    • -

    • Регистрация нового потребителя для очереди queue_name.

      Параметры функции:

      • queue_name — имя очереди, с которой будет работать потребитель;
      • consumer_id — уникальный идентификатор потребителя
    • pgq.register_consumer_at

    • queue_name (TEXT)
      consumer_name (TEXT)
      tick_pos (BIGINT)

    • -

    • Отложенная регистрация нового потребителя для очереди queue_name, начиная с tick_pos.

      Параметры функции:

      • queue_name — имя очереди;
      • consumer_name — имя потребителя;
      • tick_pos — позиция сообщения
    • pgq.unregister_consumer

    • queue_name (TEXT)
      consumer_name (TEXT)

    • -

    • Отмена регистрации потребителя для очереди queue_name.

      Параметры функции:

      • queue_name — имя очереди;
      • consumer_name — имя потребителя
    • pgq.next_batch_info

    • queue_name (TEXT)
      consumer_name (TEXT)

    • batch_id (INT8)
      cur_tick_id (INT8)
      prev_tick_id (INT8)
      cur_tick_time (TIMESTAMPTZ)
      prev_tick_time (TIMESTAMPTZ)
      cur_tick_event_seq (INT8)
      prev_tick_event_seq (INT8)

    • Получение информации о сеансе сообщений очереди queue_name для consumer consumer_name. Информация включает номер группы, позиции временных меток, временные значения, а также последовательности событий. Последовательности событий важны при ограничении размера сеанса сообщений или использовании курсоров.

      Параметры функции:

      • queue_name — имя очереди;
      • consumer_name — имя потребителя;
      • batch_id — идентификатор сеанса сообщений;
      • cur_tick_id — текущий идентификатор контрольной точки;
      • prev_tick_id — предыдущий идентификатор контрольной точки;
      • cur_tick_time — время текущей контрольной точки;
      • prev_tick_time — время предыдущей контрольной точки;
      • cur_tick_event_seq — последовательность событий для текущего тикета;
      • prev_tick_event_seq — последовательность событий для предыдущего тикета
    • pgq.next_batch

    • queue_name (TEXT)
      consumer_name (TEXT)

    • -

    • Устаревшая процедура получения информации о состоянии сообщений.

      Параметры функции:

      • queue_name — имя очереди;
      • consumer_name — имя потребителя
    • pgq.next_batch_custom

    • queue_name (TEXT)
      consumer_name (TEXT)
      min_lag (INTERVAL)
      min_count (INT4)
      min_interval (INTERVAL)

    • batch_id (INT8)
      cur_tick_id (INT8)
      prev_tick_id (INT8)
      cur_tick_time (TIMESTAMPTZ)
      prev_tick_time (TIMESTAMPTZ)
      cur_tick_event_seq (INT8)
      prev_tick_event_seq (INT8)

    • Расширенное получение информации о сеансе сообщений. Дополнительные параметры сеанса:

      • min_lag — записи, старше определенного диапазона;
      • min_count — минимальное количество записей;
      • min_interval — минимальный интервал заполнения очереди.

      Параметры функции:

      • queue_name — имя очереди;
      • consumer_name — имя потребителя;
      • min_lag — минимальная задержка для событий;
      • min_count — минимальное количество событий в сеансе сообщений;
      • min_interval — минимальный интервал между событиями;
      • batch_id — идентификатор сеанса сообщений;
      • cur_tick_id — текущий идентификатор контрольной точки;
      • prev_tick_id — предыдущий идентификатор контрольной точки;
      • cur_tick_time — время текущей контрольной точки;
      • prev_tick_time — время предыдущей контрольной точки;
      • cur_tick_event_seq — последовательность событий для текущей контрольной точки;
      • prev_tick_event_seq — последовательность событий для предыдущей контрольной точки
    • pgq.get_batch_events

    • batch_id (BIGINT)

    • ev_id (BIGINT)
      ev_time (TIMESTAMPTZ)
      ev_txid (BIGINT)
      ev_retry (INT4)
      ev_type (TEXT)
      ev_data (TEXT)
      ev_extra1 (TEXT)
      ev_extra2 (TEXT)
      ev_extra3 (TEXT)
      ev_extra4 (TEXT)

    • Получение сообщений из сеанса сообщений.

      Параметры функции:

      • batch_id — идентификатор сеанса сообщений;
      • ev_id — идентификатор события;
      • ev_time — время события;
      • ev_txid — идентификатор транзакции;
      • ev_retry — количество предыдущих попыток обработки;
      • ev_type — тип события;
      • ev_data TEXT — данные события;
      • ev_extra1..ev_extra4 — дополнительная информация
    • pgq.get_batch_cursor

    • batch_id (BIGINT)
      cursor_name (TEXT)
      quick_limit (INT4)
      extra_where (TEXT)

    • ev_id (BIGINT)
      ev_time (TIMESTAMPTZ)
      ev_txid (BIGINT)
      ev_retry (INT4)
      ev_type (TEXT)
      ev_data (TEXT)
      ev_extra1 (TEXT)
      ev_extra2 (TEXT)
      ev_extra3 (TEXT)
      ev_extra4 (TEXT)

    • Получение сообщений из сеанса сообщений с помощью курсора с заданным именем и, опционально, дополнительным условием.

      Параметры функции:

      • batch_id — идентификатор сеанса сообщений;
      • cursor_name — имя курсора;
      • quick_limit — лимит на количество событий;
      • extra_where — дополнительное условие для выборки событий;
      • ev_id — идентификатор события;
      • ev_time — время события;
      • ev_txid — идентификатор транзакции;
      • ev_retry — количество предыдущих попыток обработки;
      • ev_type — тип события;
      • ev_data — данные события;
      • ev_extra1..ev_extra4 — дополнительная информация
    • pgq.event_retry

    • batch_id (BIGINT)
      event_id (BIGINT)
      retry_time (TIMESTAMPTZ) или retry_seconds (INT)

    • -

    • Повторная постановка в очередь сообщения, с заданием интервала повторного получения (относительное или абсолютное время).

      Параметры функции:

      • batch_id — идентификатор сеанса сообщений;
      • event_id — идентификатор события;
      • retry_time или retry_seconds — время для повторной попытки или количество секунд до повторной попытки
    • pgq.batch_retry

    • batch_id (BIGINT)
      retry_seconds (INT)

    • -

    • Повторная постановка всего сеанса сообщений в очередь по истечению интервала.

      Параметры функции:

      • batch_id — идентификатор сеанса сообщений;
      • retry_seconds — время для повторной попытки
    • pgq.finish_batch

    • batch_id (BIGINT)

    • -

    • Окончание обработки сеанса сообщений.

      Параметры функции:

      • batch_id — идентификатор сеанса сообщений
    • pgq.get_queue_info

    • -

    • queue_name (TEXT)
      queue_ntables (INT)
      queue_cur_table (INT)
      queue_rotation_period (INTERVAL)
      queue_switch_time (TIMESTAMPTZ)
      queue_external_ticker (BOOLEAN)
      queue_ticker_paused (BOOLEAN)
      queue_ticker_max_count (INT)
      queue_ticker_max_lag (INTERVAL)
      queue_ticker_idle_period (INTERVAL)
      ticker_lag (INTERVAL)
      ev_per_sec (FLOAT8)
      ev_new (BIGINT)
      last_tick_id (BIGINT)

    • Получение статистической информации об очередях.

      Параметры функции:

      • queue_name — имя очереди;
      • queue_ntables — количество таблиц в очереди;
      • queue_cur_table — текущая таблица;
      • queue_rotation_period — интервал времени, по истечении которого очередь переключается на новую таблицу;
      • queue_switch_time — время последнего переключения;
      • queue_external_ticker — используется ли внешний управляющий процесс;
      • queue_ticker_paused — временно ли остановлена очередь;
      • queue_ticker_max_count — максимальное количество событий до переключения;
      • queue_ticker_max_lag — максимальная задержка между переключениями;
      • queue_ticker_idle_period — период бездействия, по истечении которого происходит переключение;
      • ticker_lag — текущая задержка переключения;
      • ev_per_sec — количество событий в секунду;
      • ev_new — количество новых событий;
      • last_tick_id — последний идентификатор контрольной точки
    • pgq.get_consumer_info

    • queue_name (TEXT)
      consumer_name (TEXT)

    • queue_name (TEXT)
      consumer_name (TEXT)
      lag (INTERVAL)
      last_seen (INTERVAL)
      last_tick (BIGINT)
      current_batch (BIGINT)
      next_tick (BIGINT)
      pending_events (BIGINT)

    • Получение информации о состоянии потребителя: отставание, активность и очередность.

      Параметры функции:

      • queue_name — имя очереди;
      • consumer_name — имя потребителя;
      • lag — отставание потребителя от текущей позиции (временной интервал);
      • last_seen — время, прошедшее с последней активности потребителя;
      • last_tick — последняя обработанная контрольная точка;
      • current_batch — идентификатор текущего сеанса сообщений;
      • next_tick — ожидаемая следующая контрольная точка;
      • pending_events — количество ожидающих обработки сообщений
    • pgq.version
    • -
    • -
    • Получение версии расширения pgq.
    • pgq.get_batch_info

    • batch_id (BIGINT)

    • queue_name (TEXT)
      consumer_name (TEXT)
      batch_start (TIMESTAMPTZ)
      batch_end (TIMESTAMPTZ)
      prev_tick_id (BIGINT)
      tick_id (BIGINT)
      lag (INTERVAL)
      seq_start (BIGINT)
      seq_end (BIGINT)

    • Получение статистической информации о сеансе сообщений.

      Параметры функции:

      • batch_id — идентификатор сеанса сообщений;
      • queue_name — имя очереди;
      • consumer_name — имя потребителя; batch_start — начало сеанса; batch_end — конец сеанса; prev_tick_id — предыдущий идентификатор контрольной точки; tick_id — текущий идентификатор контрольной точки;
        lag — задержка обработки;
        seq_start — начальное значение последовательности; seq_end — конечное значение последовательности
    • pgq.batch_event_sql

    • batch_id (BIGINT)

    • -

    • Формирование SQL-запроса для извлечения сообщений из сеанса.

      Параметры функции:

      • batch_id — идентификатор сеанса сообщений
    • pgq.batch_event_tables

    • batch_id (BIGINT)

    • -

    • Получение списка таблиц, где могут находиться сообщения сеанса.

      Параметры функции:

      • batch_id — идентификатор сеанса сообщений
    • pgq.event_retry_raw

    • queue (TEXT)
      consumer (TEXT)
      retry_after (TIMESTAMPTZ)
      ev_id (BIGINT)
      ev_time (TIMESTAMPTZ)
      ev_retry (INT)
      ev_type (TEXT)
      ev_data (TEXT)
      ev_extra1 (TEXT)
      ev_extra2 (TEXT)
      ev_extra3 (TEXT)
      ev_extra4 (TEXT)

    • -

    • Интерфейс ручного управления сообщениями.

      Параметры функции:

      • queue — имя очереди;
      • consumer — имя потребителя;
      • retry_after — момент времени, когда повторно попытаться обработать событие;
      • ev_id — идентификатор события;
      • ev_time — исходное время события;
      • ev_retry — количество предыдущих попыток обработки;
      • ev_type — тип события;
      • ev_data — данные события;
      • ev_extra1..ev_extra4 — дополнительная информация
    • pgq.find_tick_helper

    • queue_id (INT4)
      prev_tick_id (BIGINT)
      prev_tick_time (TIMESTAMPTZ)
      prev_tick_seq (BIGINT)
      min_count (BIGINT)
      min_interval (INTERVAL)

    • next_tick_id (BIGINT)
      next_tick_time (TIMESTAMPTZ)
      next_tick_seq (BIGINT)

    • Вспомогательная функция для расчета следующей позиции обработки сообщений.

      Параметры функции:

      • queue_id — идентификатор очереди;
      • prev_tick_id — предыдущий идентификатор контрольной точки;
      • prev_tick_time — время предыдущей контрольной точки;
      • prev_tick_seq — последовательность предыдущей контрольной точки
      • min_count — минимальное количество событий в сеансе сообщений;
      • min_interval — минимальный интервал между точками.
      • next_tick_id— следующий идентификатор контрольной точки;
      • next_tick_time — время следующей контрольной точки;
      • next_tick_seq — последовательность следующей контрольной точки
    • pgq.ticker

    • queue_name (TEXT)
      tick_id (BIGINT)
      orig_timestamp (TIMESTAMPTZ)
      event_seq (BIGINT)

    • -

    • Регистрация контрольной точки обработки сообщений. При использовании внешнего сервиса требуется установить параметр queue_external_ticker.

      Переменные функции:

      • queue_name — имя очереди;
      • tick_id — текущий идентификатор контрольной точки;
      • orig_timestamp — исходное время;
      • event_seq — текущая последовательность событий
    • pgq.maint_retry_events
    • -
    • -
    • Повторная постановка в очередь помеченных сообщений. Выполняется частично и должна вызываться до получения кода возврата 0.
    • pgq.maint_rotate_tables_step1

    • queue_name (TEXT)

    • -

    • Первый шаг обслуживания: очистка таблиц очереди, не содержащих необработанных сообщений. Выполняется с блокировкой.

      Параметры функции:

      • queue_name — имя очереди
    • pgq.maint_rotate_tables_step2
    • -
    • -
    • Второй шаг обслуживания: обновление информации об очереди.
    • pgq.maint_tables_to_vacuum
    • -
    • -
    • Формирование списка таблиц очереди, подлежащих вакуумированию.
    • pgq.maint_operations

    • -

    • func_name (TEXT)
      func_arg (TEXT)

    • Формирование списка регламентных операций для обслуживания очереди.

      Параметры функции:

      • func_name — имя операции;
      • func_arg — аргумент операции
    • pgq.grant_perms

    • queue_name (TEXT)

    • -

    • Назначение прав доступа на таблицы очереди для роли public.

      Параметры функции:

      • queue_name — имя очереди
    • pgq.tune_storage

    • queue_name (TEXT)

    • -

    • Настройка параметров хранения для таблиц очереди: fillfactor, autovacuum.

      Параметры функции:

      • queue_name — имя очереди
    • pgq.force_tick

    • queue_name (TEXT)

    • -

    • Принудительная регистрация контрольной точки обработки сообщений.

      Параметры функции:

      • queue_name — имя очереди
    • pgq.seq_getval

    • seq_name (TEXT)

    • -

    • Аналог getval() для получения значения последовательности (для совместимости).

      Параметры функции:

      • seq_name — имя последовательности
    • pgq.seq_setval

    • seq_name (TEXT)
      new_value (BIGINT)

    • -

    • Аналог setval() для установки значения последовательности.

      Параметры функции:

      • seq_name — имя последовательности;
      • new_value — новое значение
    • pgq.quote_fqname

    • name (TEXT)

    • -

    • Форматирование полного имени объекта схемы и таблицы (для совместимости).

      Параметры функции:

      • name — имя объекта (схема,таблица) ::::

Триггеры

Одним из ключевых механизмов взаимодействия таблиц с очередями являются триггерные функции, автоматически добавляющие события в очередь при изменении данных в таблицах.

Триггерные функции в pgq служат для регистрации изменений в таблицах (INSERT, UPDATE, DELETE) и формирования событий, которые затем обрабатываются через очередь.

Перечень триггерных функций расширения pgq, используемых для работы с очередями сообщений:

Триггерные функции pgq

Триггер
Примеры использования
Описание
pgq.jsontriga()
-- AFTER TRIGGER 
CREATE TRIGGER trga_ins_queue AFTER INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.jsontriga('queue_name');

-- BEFORE TRIGGER
TRIGGER trgb_ins_queue BEFORE INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.jsontriga('queue_name','SKIP');
Добавление сообщений в очередь, имя которой задается первым аргументом триггерной функции. Сообщение кодируется в формате jsonb
pgq.logutriga()
-- AFTER TRIGGER 
CREATE TRIGGER trga_ins_queue AFTER INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('queue_name');

-- BEFORE TRIGGER
CREATE TRIGGER trgb_ins_queue BEFORE INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('queue_name','SKIP');
Добавление сообщений в очередь, имя которой задается первым аргументом триггерной функции. Сообщение кодируется в формате url-encoded (text)
pgq.sqltriga()
-- AFTER TRIGGER 
CREATE TRIGGER trga_ins_queue AFTER INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.sqltriga('queue_name');

-- BEFORE TRIGGER
CREATE TRIGGER trgb_ins_queue BEFORE INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.sqltriga('queue_name','SKIP');
Добавление сообщений в очередь, заданную первым параметром триггерной функции. Сообщение кодируется в формате форматированного SQL (text)

Триггеры pgq можно настраивать с помощью аргументов, передаваемых в объявлении триггера:

Аргументы триггерных функций

Аргумент
Назначение
SKIP
Пропуск выполнения операции. Поведение триггерной функции для BEFORE trigger
ignore=col1,...
Исключение указанных полей из JSON
pkey=col1,...
Явное указание полей, содержащих primary key
backup
Добавление предыдущего состояния строки (OLD.*) в поле ev_extra2
colname=EXPR
Переопределение значения одного из полей сообщения (ev_type, ev_data, ev_extra1–ev_extra4) на выражение EXPR
when=EXPR
Условие фильтрации: сообщение добавляется в очередь только если EXPR возвращает true

Сообщение, добавляемое в очередь с помощью триггерных функций имеет следующую структуру:

  • ev_type — тип операции: I (INSERT), U (UPDATE), D (DELETE), с указанием первичного ключа через двоеточие :;
  • ev_data — содержит значения полей таблицы в формате JSON;
  • ev_extra1 — мя таблицы, из которой поступило сообщение;
  • ev_extra2 — (опционально) значения полей до изменения (при наличии OLD.*, используется с аргументом backup).

Таблицы

Расширение pgq сопровождается рядом системных таблиц, которые обеспечивают работу этой механики:

:

Системные таблицы pgq

Таблица
Структура
Описание
pgq.consumer
CREATE TABLE pgq.consumer ( 
co_id serial4 NOT NULL, -- Consumer ID
co_name text NOT NULL, -- Consumer Name
CONSTRAINT consumer_name_uq UNIQUE (co_name),
CONSTRAINT consumer_pkey PRIMARY KEY (co_id)
);
Список зарегистрированных потребителей очередей (consumer)
pgq.event_template

details Cтруктура таблицы

CREATE TABLE pgq.event_template (
ev_id int8 NOT NULL, -- Event ID
ev_time timestamptz NOT NULL, -- Event Timestamp
ev_txid int8 NOT NULL DEFAULT txid_current(), -- Event XID
ev_owner int4 NULL, -- Event owner (Queue ID)
ev_retry int4 NULL, -- Event ID (retried)
ev_type text NULL, -- Event data
ev_data text NULL, -- Event data
ev_extra1 text NULL, -- Event extra data
ev_extra2 text NULL, -- Event extra data
ev_extra3 text NULL, -- Event extra data
ev_extra4 text NULL -- Event extra data
);

:::

  • Шаблон очереди сообщений. При создании очереди на его основе создаются таблицы событий. По умолчанию — 3 наследуемые таблицы.
    • pgq.queue

    • :::details Cтруктура таблицы

      CREATE TABLE pgq.queue (
      queue_id serial4 NOT NULL, -- queue id for internal usage
      queue_name text NOT NULL, -- queue name visible outside
      queue_ntables int4 NOT NULL DEFAULT 3, -- how many data tables the queue has
      queue_cur_table int4 NOT NULL DEFAULT 0, -- which data table is currently active
      queue_rotation_period interval NOT NULL DEFAULT '02:00:00'::interval, -- period for data table rotation
      queue_switch_step1 int8 NOT NULL DEFAULT txid_current(), -- tx when rotation happened
      queue_switch_step2 int8 NULL DEFAULT txid_current(), -- tx after rotation was committed
      queue_switch_time timestamptz NOT NULL DEFAULT now(), -- time when switch happened
      queue_external_ticker bool NOT NULL DEFAULT false, -- ticks come from some external sources
      queue_disable_insert bool NOT NULL DEFAULT false, -- disallow pgq.insert_event()
      queue_ticker_paused bool NOT NULL DEFAULT false, -- Is ticker paused
      queue_ticker_max_count int4 NOT NULL DEFAULT 500, -- batch should not contain more events
      queue_ticker_max_lag interval NOT NULL DEFAULT '00:00:03'::interval, -- events should not age more
      queue_ticker_idle_period interval NOT NULL DEFAULT '00:01:00'::interval, -- how often to tick when no events happen
      queue_per_tx_limit int4 NULL, -- Max number of events single TX can insert
      queue_data_pfx text NOT NULL, -- prefix for data table names
      queue_event_seq text NOT NULL, -- sequence for event id's
      queue_tick_seq text NOT NULL, -- sequence for tick id's
      queue_extra_maint _text NULL, -- array of functon names to call during maintenance
      CONSTRAINT queue_name_uq UNIQUE (queue_name),
      CONSTRAINT queue_pkey PRIMARY KEY (queue_id)
      );

      :::

    • Таблица зарегистрированных очередей сообщений

    • pgq.subscription

    • :::details Cтруктура таблицы

      CREATE TABLE pgq."subscription" (
      sub_id serial4 NOT NULL, -- Subscription ID
      sub_queue int4 NOT NULL, -- Queue ID
      sub_consumer int4 NOT NULL, -- Consumer ID
      sub_last_tick int8 NULL, -- Last Tick ID
      sub_active timestamptz NOT NULL DEFAULT now(), -- Last active timestamp
      sub_batch int8 NULL, -- Last Batch ID
      sub_next_tick int8 NULL, -- Next Tick ID
      CONSTRAINT subscription_batch_idx UNIQUE (sub_batch),
      CONSTRAINT subscription_pkey PRIMARY KEY (sub_queue, sub_consumer),
      CONSTRAINT sub_consumer_fkey FOREIGN KEY (sub_consumer) REFERENCES pgq.consumer(co_id),
      CONSTRAINT sub_queue_fkey FOREIGN KEY (sub_queue) REFERENCES pgq.queue(queue_id)
      );

      :::

    • Таблица, содержащая информацию о получателях потребителей на очереди

    • pgq.tick

    • :::details Cтруктура таблицы

      CREATE TABLE pgq.tick (
      tick_queue int4 NOT NULL, -- Queue ID
      tick_id int8 NOT NULL, -- Tick ID
      tick_time timestamptz NOT NULL DEFAULT now(), -- Tick timestamp
      tick_snapshot txid_snapshot NOT NULL DEFAULT txid_current_snapshot(), -- Tick XID
      tick_event_seq int8 NOT NULL, -- Tick event
      CONSTRAINT tick_pkey PRIMARY KEY (tick_queue, tick_id),
      CONSTRAINT tick_queue_fkey FOREIGN KEY (tick_queue) REFERENCES pgq.queue(queue_id)
      );

      :::

    • Таблица, фиксирующая контрольные точки времени — метки для синхронизации обработки событий ::::

Роли

В расширении pgq используются определенные роли, обеспечивающие контроль доступа к очередям, событиям и процессу их обработки:

Системные таблицы pgq

Роль
Описание
pgq_admin
Администратор расширения. Член ролей pgq_reader, pgq_writer. Позволяет создавать и управлять очередями сообщений
pgq_reader
Роль потребителя (consumer). Позволяет создавать сеанс сообщений, читать и управлять сообщением
pgq_writer
Роль отправителя очереди (provider). Позволяет помещать сообщения в очередь, в том числе с помощью триггеров

Настройка менеджера очередей pangolin-pgqd

Менеджер очереди является системным сервисом, предоставляющим элементы синхронизации (tick).

При запуске сервиса происходит:

  • подключение к сконфигурированной сущности;
  • поиск установленного расширения по всем базам данным, доступным для подключения;
  • вызов функции pgq.ticker() c заданной периодичностью;
  • вызов функций pgq.maint_rotate_tables_step1(), pgq.maint_rotate_tables_step2() c заданной периодичностью;
  • вызов функции pgq.maint_retry_events() с заданной периодичностью.

Изменение конфигурации pangolin-pgqd производится с использованием конфигурационного файла pangolin-pgqd.ini:

[pgqd]
# where to log
logfile = ~/log/pangolin-pgqd.log

# pidfile
pidfile = ~/pangolin-pgqd/pangolin-pgqd.pid

## optional parameters ##

# libpq connect string without dbname=
#base_connstr = host=<IP-адрес> port=<Порт> user=<Имя пользователя> password=<Пароль>

# startup db to query other databases
#initial_database = template1

# limit ticker to specific databases
#database_list =

# log into syslog
syslog = 1
syslog_ident = pgqd

## optional timeouts ##

# how often to check for new databases
check_period = 60

# how often to flush retry queue
retry_period = 30

# how often to do maintentance
#maint_period = 120

# how often to run ticker
ticker_period = 1

Описание конфигурационных параметров:

Конфигурационные параметры

Конфигурационный
параметр
Значение по умолчанию
Описание
logfile
~/log/pangolin-pgqd.log
Путь к лог-файлу
pidfile
~/pangolin-pgqd/pangolin-pgqd.pid
Путь к pid-файлу
base_connstr
-
Строка подключения к сущности (libpq syntax). Не должна включать имя базы данных
initial_database
template1
База данных для автоматического подключения (при отсутствии параметра database_list)
database_list
-
Список баз данных для подключения. Если не задан — производится автоматический поиск
syslog
1
Признак логирования действий в syslog
syslog_ident
pgqd
Идентификатор syslog
check_period
60
Периодичность поиска новых баз данных (в случае автоматического поиска), секунды
retry_period
30
Периодичность поиска сообщений, повторно добавляемых в очередь, секунды
maint_period
120
Периодичность проведения регламентных операций, секунды
ticker_period
1
Периодичность синхронизации очередей, секунды

Доступные параметры командной строки при использовании расширения:

./pgqd --help
usage: pgq-ticker [switches] config.file
Switches:
-v Increase verbosity
-q No output to console
-d Daemonize
-h Show help
-V Show version
--ini Show sample config file
-s Stop - send SIGINT to running process
-k Kill - send SIGTERM to running process
-r Reload - send SIGHUP to running process

Где:

  • -v — повышение уровня подробности вывода;
  • -q — отключение вывода сообщений в консоль (режим тишины);
  • -d — запуск в фоновом режиме;
  • -h — отображение справки по использованию;
  • -V — отображение версии;
  • --ini — отображение примера конфигурационного файла;
  • -s — остановка выполнения — отправка сигнала SIGINT запущенному процессу;
  • -k — завершение работы — отправка сигнала SIGTERM запущенному процессу;
  • -r — перезагрузка конфигурации — отправка сигнала SIGHUP запущенному процессу.

Для окончания конфигурирования сервиса требуется его перезагрузка:

sudo systemctl reload pangolin-pgqd

Настройка расширения pgq

  1. Настройте количество таблиц очереди перед ее созданием, выполнив следующий запрос:

    SET ROLE tuz_pgq_admin;
    -- <n> - number of desired tables per queue
    ALTER TABLE pgq.queue ALTER COLUMN queue_ntables SET DEFAULT <n>;
  2. Настройте остальные параметры очереди:

    SET ROLE tuz_pgq_admin;
    -- <queue_param> one of:
    -- queue_ticker_max_count
    -- queue_ticker_max_lag
    -- queue_ticker_idle_period
    -- queue_ticker_paused
    -- queue_rotation_period
    -- queue_external_ticker
    SELECT * FROM pgq.set_queue_config('<queue_name>','<queue_param>','<param_value>');
  3. Проверьте наполнение очереди (количество элементов, скорость наполнения, работоспособность расширения):

    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.get_queue_info();
  4. Проверьте количество сообщений, ожидающих обработки:

    SET ROLE tuz_pgq_admin;
    SELECT * FROM get_consumer_info(['<queue_name>','<consumer_name>']);
  5. Настройте регламентные операции для таблиц очередей - добавьте дополнительные обработки для таблиц очередей:

    SET ROLE tuz_pgq_admin;
    UPDATE pgq.queue SET queue_extra_maint = ARRAY['<func1>','<func2>'] WHERE queue_name='<queue_name>';
Примечание

Изменение типа данных для сообщений (на json/b или xml) не рекомендуется. Основная функция для добавления сообщений pgq.insert_event_raw написана на языке C, скомпилирована в расширение и не предусматривает изменение состава полей.

Изменение позиции повторной отправки сообщения (event retry) не предусмотрена. Повторно отправляемое сообщение помещается в конец очереди.

Повторная приоритизация возможна созданием дополнительной очереди настройки приоритетов на уровне бизнес-логики.

Проверка работоспособности

При штатной работе менеджера очередей:

  • наблюдается увеличение значений поля last_tick_id;
  • проводятся периодические подключения к БД, что может приводить к росту объема логов. После установки функциональности рекомендуется пересмотреть настройки аудита для пользователя, под которым будет производиться работа менеджера очередей.
Примечание

При нештатной работе менеджера очередей наблюдаются одинаковые значение поля last_tick_id между итерациями, а так же возрастание значений в поле ticker_lag.

Использование модуля

Обозначения переменных

Далее в примерах имеются следующие обозначения:

  • <name_queue> - наименование очереди;
  • <type_event> - тип события;
  • <event_information> - информация о событии;
  • <name_consumer> - наименование потребителя;
  • <id_event> - идентификатор события;
  • <id_batch> - идентификатор последовательности;
  • <second_until_next_attempt> - количество секунд до повторной попытки выполнить действие.

Установка и проверка расширения pgq в СУБД Pangolin

  1. Создайте базу данных test_pgq и подключитесь к ней:

    CREATE DATABASE test_pgq;
    \c test_pgq
  2. Создайте расширение pgq:

    CREATE EXTENSION pgq;
  3. Проверьте наличие базы данных test_pgq и расширения pgq на ведущем сервере:

    \dx pgq

    Результатом выполнения — вывод версии расширения, например:

                 List of installed extensions
    ┌──────┬─────────┬────────────┬──────────────────────────────┐
    │ Name │ Version │ Schema │ Description │
    ├──────┼─────────┼────────────┼──────────────────────────────┤
    │ pgq │ 3.5.1 │ pg_catalog │ Generic queue for PostgreSQL │
    └──────┴─────────┴────────────┴──────────────────────────────┘
    (1 row)

Установка и запуск менеджера очередей pangolin-pgqd

  1. Откройте конфигурационный файл pangolin-pgqd.ini, убедитесь в наличии параметра logfile и закомментированного параметра database_list.

  2. Перезапустите менеджер соединений pangolin-pgqd, убедитесь в его запуске с помощью команд:

    sudo systemctl restart pangolin-pgqd;
    sudo systemctl status pangolin-pgqd;
  3. Проверьте автоматическое определение БД со стороны менеджера подключений pangolin-pgqd на ведущем сервере:

    sudo journalctl -n10 --no-pager -eu pangolin-pgqd

    Результат выполнения команды содержит строку LOG [test_pgq]: pgq version ok: {component_version}.

  4. Проверьте автоматическое определение БД со стороны менеджера подключений pangolin-pgqd на ведомом сервере:

    sudo journalctl -n10 --no-pager -eu pangolin-pgqd

    Результат выполнения команды содержит строки:

    ERROR test_pgq: ERROR:  cannot execute LOCK TABLE during recovery
    ERROR: cannot execute pg_current_xact_id() during recovery

Работа с очередью от роли потребителя

  1. Создание очереди:

    SELECT * FROM pgq.create_queue(<name_queue> TEXT);
  2. Добавление в очередь сообщения:

    SELECT * FROM pgq.insert_event(<name_queue> TEXT, <type_event> TEXT, <event_information> TEXT);
  3. Создание потребителя (consumer):

    SELECT * FROM pgq.register_consumer(<name_queue> TEXT, <name_consumer> TEXT);
    Внимание!

    Потребитель будет получать события только после собственной регистрации, поэтому возможность создавать события требует первичной «регистрации» потребителя.

  4. Получение идентификатора (ID) блока последовательных сообщений в очереди:

    SELECT * FROM pgq.next_batch(<name_queue> TEXT, <name_consumer> TEXT);
  5. Получение всех событий блока последовательных сообщений:

    SELECT * FROM pgq.get_batch_events(<id_batch> BIGINT);
    Примечание

    Возвращенное значение может быть пустым.

  6. Повторное помещение события в очередь:

    SELECT * FROM pgq.event_retry(<id_batch> BIGINT, <id_event> BIGINT, <second_until_next_attempt> INTEGER);

Работа с очередью от роли отправителя

  1. На ведущем сервере создайте пользователей для работы с ролевой моделью pgq:

    SET ROLE db_admin;
    \c test_pgq
    CREATE USER tuz_pgq_admin IN ROLE pgq_admin;
    CREATE USER tuz_pgq_reader IN ROLE pgq_reader;
    CREATE USER tuz_pgq_writer IN ROLE pgq_writer;
  2. Создайте очередь сообщений test_q:

    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.create_queue('test_q');
  3. Зарегистрируйте получателя сообщений:

    SET ROLE tuz_pgq_reader;
    SELECT pgq.register_consumer('test_q','test_consumer');
  4. Заполните очередь 10 раз наборами из 10000 сообщений:

    SET ROLE tuz_pgq_writer;
    DO
    $$
    DECLARE
    i int4;
    BEGIN
    FOR i IN 1..10 LOOP
    PERFORM pgq.insert_event('test_q',id::text,format('MSG %1$s',id)) from generate_series(1,1000) id;
    END LOOP;
    END
    $$;
    \watch 0
  5. Получите статистическую информацию по очереди и потребителям:

    SET ROLE tuz_pgq_reader;
    SELECT * FROM pgq.get_queue_info()\gx
    SELECT * FROM pgq.get_consumer_info()\gx

    Результат выполнения команды — получение таблиц вида:

    SELECT * FROM pgq.get_queue_info()\gx
    ┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
    │ queue_name │ test_q │
    │ queue_ntables │ 3
    │ queue_cur_table │ 0
    │ queue_rotation_period │ 02:00:00
    │ queue_switch_time │ 2023-11-16 12:57:13.566682+03
    │ queue_external_ticker │ f │
    │ queue_ticker_paused │ f │
    │ queue_ticker_max_count │ 500
    │ queue_ticker_max_lag │ 00:00:03
    │ queue_ticker_idle_period │ 00:01:00
    │ ticker_lag │ 00:00:04.742343
    │ ev_per_sec │ 770.3491295563064
    │ ev_new │ 0
    │ last_tick_id │ 53
    └──────────────────────────┴───────────────────────────────┘
    SELECT * FROM pgq.get_consumer_info()\gx
    ┌─[ RECORD 1 ]───┬─────────────────┐
    │ queue_name │ test_q │
    │ consumer_name │ test_consumer │
    │ lag │ 00:02:09.63673
    │ last_seen │ 00:01:56.731471
    │ last_tick │ 36
    │ current_batch │ NULL
    │ next_tick │ NULL
    │ pending_events │ 125050
    └────────────────┴─────────────────┘
    Примечание

    Значения контрольных точек времени (tick) могут изменяться.

  6. Пример автоматизированного конвейера получения и обработки сеансов:

    SET ROLE tuz_pgq_reader;
    DO
    $$
    DECLARE
    v_batch_id int8;
    v_event_cnt int8;
    BEGIN
    SELECT batch_id FROM pgq.next_batch_info('test_q','test_consumer') INTO v_batch_id;
    IF v_batch_id ISNULL THEN
    RETURN;
    END IF;
    SELECT count(*) FROM pgq.get_batch_events(v_batch_id) INTO v_event_cnt;
    RAISE NOTICE 'Got % events from batch %', v_event_cnt::text,v_batch_id::TEXT;
    -- re-queue random 10 events at 5 secs;
    IF v_event_cnt > 1 THEN
    PERFORM pgq.event_retry(v_batch_id,ev_id,5) FROM pgq.get_batch_events(v_batch_id) ORDER BY random() LIMIT 10;
    END IF;
    PERFORM pgq.finish_batch(v_batch_id);
    END;
    $$;
    \watch 2

    При успешном результате наблюдается последовательное увеличение batch_id и периодическое получение сообщений до исчерпания очереди.

  7. Получите статистическую информацию по очереди и потребителям:

    SET ROLE tuz_pgq_reader;
    SELECT * FROM pgq.get_queue_info()\gx
    SELECT * FROM pgq.get_consumer_info()\gx

    Результат выполнения команды — получение таблиц вида:

    SELECT * FROM pgq.get_queue_info()\gx
    ┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
    │ queue_name │ test_q │
    │ queue_ntables │ 3
    │ queue_cur_table │ 0
    │ queue_rotation_period │ 02:00:00
    │ queue_switch_time │ 2023-11-16 12:57:13.566682+03
    │ queue_external_ticker │ f │
    │ queue_ticker_paused │ f │
    │ queue_ticker_max_count │ 500
    │ queue_ticker_max_lag │ 00:00:03
    │ queue_ticker_idle_period │ 00:01:00
    │ ticker_lag │ 00:00:28.175869
    │ ev_per_sec │ 568.1271702331466
    │ ev_new │ 0
    │ last_tick_id │ 63
    └──────────────────────────┴───────────────────────────────┘
    SELECT * FROM pgq.get_consumer_info()\gx
    ┌─[ RECORD 1 ]───┬─────────────────┐
    │ queue_name │ test_q │
    │ consumer_name │ test_consumer │
    │ lag │ 00:01:16.866497
    │ last_seen │ 00:01:16.028043
    │ last_tick │ 61
    │ current_batch │ NULL
    │ next_tick │ NULL
    │ pending_events │ 20
    └────────────────┴─────────────────┘

    При успешном результате наблюдается увеличение last_tick_id, уменьшение размера очереди (pending_events).

  8. Удалите очередь:

    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.drop_queue(x_queue_name =>'test_q', x_force => TRUE);

    Результат выполнения команды — получение таблиц вида:

    ┌────────────┐
    │ drop_queue │
    ├────────────┤
    1
    └────────────┘

Диагностика ошибок

В этом разделе описана информация, с помощью которой можно диагностировать неполадки.

Менеджер очередей

Проверка менеджера очередей на уровне SQL выполняется аналогично контролю наполнения очереди. В нормальной ситуации:

  • поле last_tick_id регулярно увеличивается — это признак активности менеджера очередей и их обработки;
  • поле ticker_lag остается в пределах допустимого времени задержки.

При сбоях:

  • значение last_tick_id не изменяется между итерациями;
  • значение ticker_lag постепенно растет, что указывает на отсутствие новых событий от менеджера очередей.
Примечание

При нормальной работе менеджера очередей наблюдаются периодические подключения к БД. Это может вызвать рост объема логов при включенном аудите. Рекомендуется пересмотреть параметры аудита для роли, от имени которой работает менеджер очередей.

Очереди сообщений

Иногда требуется временно приостановить обработку очереди, например, при выполнении обслуживания или отладки. Это можно сделать вручную через вызовы SQL-функций.

Приостановление очереди на прием:

SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','true');

Возобновление работы очереди сообщений:

SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','false');

Ссылки на документацию разработчика

Функции расширения pgq: https://pgq.github.io/extension/pgq/files/external-sql.html.