Перейти к основному содержимому

pgq. Организация монопольных очередей сообщений

Версия: 3.5.1.

В исходном дистрибутиве установлено по умолчанию: нет.

Связанные компоненты: pangolin-pgqd.

Схема размещения: pgq.

Описание

Модуль предназначен для организации очередей сообщений и состоит из непосредственно расширения pgq для postgres и демона pangolin-pgqd.

Полезная нагрузка сообщения может содержать от 2 до 6 текстовых полей. Тип данных text в данном случае выбран, как наиболее общий базовый тип.

При организации очередей следует избегать большого количества отдельных сообщений меньшего объема в пользу меньшего количества более объемных сообщений. Таблицы сообщений не содержат дополнительных индексов, позволяющих фильтровать условия видимости сообщений, и ориентированы, прежде всего, на скорость вставки. Компенсирующей мерой является штатное разделение таблицы очередей на набор наследуемых таблиц (INHERIT TABLE), регламентные операции по каждой из которых разделены по времени. По умолчанию, количество наследуемых таблиц равно 3 (может быть скорректировано пользователем до создания очереди).

Доработка

Доработка не проводилась.

Ограничения

Ограничения отсутствуют.

Установка

sudo dnf install pangolin-pgq-{version_component}-{OS}.x86_64.rpm
Подсказка

Пример заполненной команды:

cd distributive

sudo apt-get -y install pangolin-pgq-3.5.1-altlinux10.x86_64.rpm

Модуль считается «доверенным», поэтому его могут устанавливать пользователи, имеющие право CREATE в текущей базе данных:

CREATE EXTENSION pgq;

Настройка

Функции

Перечень функций расширения pgq, используемых для работы с очередями сообщений:

Функции расширения pgq

Функция

Входные параметры

Выходные параметры

Описание

pgq.create_queue

queue_name (TEXT)

-

Создание очереди с заданным именем.

Параметры функции:

  • queue_name — имя создаваемой очереди

pgq.drop_queue

queue_name (TEXT)
force (BOOL)

-

Удаление очереди с заданным именем.

Параметры функции:

  • queue_name — имя очереди;
  • Если force установлено в true, то удаление происходит принудительно (даже в случае, если очередь содержит события)

pgq.set_queue_config

queue_name (TEXT)
param_name (TEXT)
param_value (TEXT)

-

Задание параметров очереди с заданным именем.

Параметры функции:

  • queue_name — имя очереди;
  • param_name — ключ конфигурации;
  • param_value — новое значение

Допустимые параметры:

  • queue_ticker_max_count
  • queue_ticker_max_lag
  • queue_ticker_idle_period
  • queue_ticker_paused
  • queue_rotation_period
  • queue_external_ticker
Внимание!

Количество таблиц в очереди невозможно изменить после создания очереди.

Для изменения количества таблиц следует воспользоваться запросом (перед созданием очереди):

ALTER TABLE pgq.queue ALTER COLUMN queue_ntables SET DEFAULT <n>;

pgq.insert_event

queue_name (TEXT)
ev_type (TEXT)
ev_data (TEXT)
ev_extra1 (TEXT)
ev_extra2 (TEXT)
ev_extra3 (TEXT)
ev_extra4 (TEXT)

-

Добавление нового сообщения в очередь.

Параметры функции:

  • queue_name — имя очереди;
  • ev_type — тип сообщения;
  • ev_data — данные сообщения;
  • ev_extra1..ev_extra4 — дополнительная информация

При использовании дополнительных параметров должны быть определены все 4 параметра, NULL допускается

pgq.current_event_table

queue_name (TEXT)

-

Отображение текущей таблицы очереди, в которую будут добавляться записи.

Параметры функции:

  • queue_name — имя очереди

pgq.register_consumer

queue_name (TEXT)
consumer_id (TEXT)

-

Регистрация нового потребителя для очереди queue_name.

Параметры функции:

  • queue_name — имя очереди, с которой будет работать потребитель;
  • consumer_id — уникальный идентификатор потребителя

pgq.register_consumer_at

queue_name (TEXT)
consumer_name (TEXT)
tick_pos (BIGINT)

-

Отложенная регистрация нового потребителя для очереди queue_name, начиная с tick_pos.

Параметры функции:

  • queue_name — имя очереди;
  • consumer_name — имя потребителя;
  • tick_pos — позиция сообщения

pgq.unregister_consumer

queue_name (TEXT)
consumer_name (TEXT)

-

Отмена регистрации потребителя для очереди queue_name.

Параметры функции:

  • queue_name — имя очереди;
  • consumer_name — имя потребителя

pgq.next_batch_info

queue_name (TEXT)
consumer_name (TEXT)

batch_id (INT8)
cur_tick_id (INT8)
prev_tick_id (INT8)
cur_tick_time (TIMESTAMPTZ)
prev_tick_time (TIMESTAMPTZ)
cur_tick_event_seq (INT8)
prev_tick_event_seq (INT8)

Получение информации о сеансе сообщений очереди queue_name для consumer consumer_name. Информация включает номер группы, позиции временных меток, временные значения, а также последовательности событий. Последовательности событий важны при ограничении размера сеанса сообщений или использовании курсоров.

Параметры функции:

  • queue_name — имя очереди;
  • consumer_name — имя потребителя;
  • batch_id — идентификатор сеанса сообщений;
  • cur_tick_id — текущий идентификатор контрольной точки;
  • prev_tick_id — предыдущий идентификатор контрольной точки;
  • cur_tick_time — время текущей контрольной точки;
  • prev_tick_time — время предыдущей контрольной точки;
  • cur_tick_event_seq — последовательность событий для текущего тикета;
  • prev_tick_event_seq — последовательность событий для предыдущего тикета

pgq.next_batch

queue_name (TEXT)
consumer_name (TEXT)

-

Устаревшая процедура получения информации о состоянии сообщений.

Параметры функции:

  • queue_name — имя очереди;
  • consumer_name — имя потребителя

pgq.next_batch_custom

queue_name (TEXT)
consumer_name (TEXT)
min_lag (INTERVAL)
min_count (INT4)
min_interval (INTERVAL)

batch_id (INT8)
cur_tick_id (INT8)
prev_tick_id (INT8)
cur_tick_time (TIMESTAMPTZ)
prev_tick_time (TIMESTAMPTZ)
cur_tick_event_seq (INT8)
prev_tick_event_seq (INT8)

Расширенное получение информации о сеансе сообщений. Дополнительные параметры сеанса:

  • min_lag — записи, старше определенного диапазона;
  • min_count — минимальное количество записей;
  • min_interval — минимальный интервал заполнения очереди.

Параметры функции:

  • queue_name — имя очереди;
  • consumer_name — имя потребителя;
  • min_lag — минимальная задержка для событий;
  • min_count — минимальное количество событий в сеансе сообщений;
  • min_interval — минимальный интервал между событиями;
  • batch_id — идентификатор сеанса сообщений;
  • cur_tick_id — текущий идентификатор контрольной точки;
  • prev_tick_id — предыдущий идентификатор контрольной точки;
  • cur_tick_time — время текущей контрольной точки;
  • prev_tick_time — время предыдущей контрольной точки;
  • cur_tick_event_seq — последовательность событий для текущей контрольной точки;
  • prev_tick_event_seq — последовательность событий для предыдущей контрольной точки

pgq.get_batch_events

batch_id (BIGINT)

ev_id (BIGINT)
ev_time (TIMESTAMPTZ)
ev_txid (BIGINT)
ev_retry (INT4)
ev_type (TEXT)
ev_data (TEXT)
ev_extra1 (TEXT)
ev_extra2 (TEXT)
ev_extra3 (TEXT)
ev_extra4 (TEXT)

Получение сообщений из сеанса сообщений.

Параметры функции:

  • batch_id — идентификатор сеанса сообщений;
  • ev_id — идентификатор события;
  • ev_time — время события;
  • ev_txid — идентификатор транзакции;
  • ev_retry — количество предыдущих попыток обработки;
  • ev_type — тип события;
  • ev_data TEXT — данные события;
  • ev_extra1..ev_extra4 — дополнительная информация

pgq.get_batch_cursor

batch_id (BIGINT)
cursor_name (TEXT)
quick_limit (INT4)
extra_where (TEXT)

ev_id (BIGINT)
ev_time (TIMESTAMPTZ)
ev_txid (BIGINT)
ev_retry (INT4)
ev_type (TEXT)
ev_data (TEXT)
ev_extra1 (TEXT)
ev_extra2 (TEXT)
ev_extra3 (TEXT)
ev_extra4 (TEXT)

Получение сообщений из сеанса сообщений с помощью курсора с заданным именем и, опционально, дополнительным условием.

Параметры функции:

  • batch_id — идентификатор сеанса сообщений;
  • cursor_name — имя курсора;
  • quick_limit — лимит на количество событий;
  • extra_where — дополнительное условие для выборки событий;
  • ev_id — идентификатор события;
  • ev_time — время события;
  • ev_txid — идентификатор транзакции;
  • ev_retry — количество предыдущих попыток обработки;
  • ev_type — тип события;
  • ev_data — данные события;
  • ev_extra1..ev_extra4 — дополнительная информация

pgq.event_retry

batch_id (BIGINT)
event_id (BIGINT)
retry_time (TIMESTAMPTZ) или retry_seconds (INT)

-

Повторная постановка в очередь сообщения, с заданием интервала повторного получения (относительное или абсолютное время).

Параметры функции:

  • batch_id — идентификатор сеанса сообщений;
  • event_id — идентификатор события;
  • retry_time или retry_seconds — время для повторной попытки или количество секунд до повторной попытки

pgq.batch_retry

batch_id (BIGINT)
retry_seconds (INT)

-

Повторная постановка всего сеанса сообщений в очередь по истечению интервала.

Параметры функции:

  • batch_id — идентификатор сеанса сообщений;
  • retry_seconds — время для повторной попытки

pgq.finish_batch

batch_id (BIGINT)

-

Окончание обработки сеанса сообщений.

Параметры функции:

  • batch_id — идентификатор сеанса сообщений

pgq.get_queue_info

-

queue_name (TEXT)
queue_ntables (INT)
queue_cur_table (INT)
queue_rotation_period (INTERVAL)
queue_switch_time (TIMESTAMPTZ)
queue_external_ticker (BOOLEAN)
queue_ticker_paused (BOOLEAN)
queue_ticker_max_count (INT)
queue_ticker_max_lag (INTERVAL)
queue_ticker_idle_period (INTERVAL)
ticker_lag (INTERVAL)
ev_per_sec (FLOAT8)
ev_new (BIGINT)
last_tick_id (BIGINT)

Получение статистической информации об очередях.

Параметры функции:

  • queue_name — имя очереди;
  • queue_ntables — количество таблиц в очереди;
  • queue_cur_table — текущая таблица;
  • queue_rotation_period — интервал времени, по истечении которого очередь переключается на новую таблицу;
  • queue_switch_time — время последнего переключения;
  • queue_external_ticker — используется ли внешний управляющий процесс;
  • queue_ticker_paused — временно ли остановлена очередь;
  • queue_ticker_max_count — максимальное количество событий до переключения;
  • queue_ticker_max_lag — максимальная задержка между переключениями;
  • queue_ticker_idle_period — период бездействия, по истечении которого происходит переключение;
  • ticker_lag — текущая задержка переключения;
  • ev_per_sec — количество событий в секунду;
  • ev_new — количество новых событий;
  • last_tick_id — последний идентификатор контрольной точки

pgq.get_consumer_info

queue_name (TEXT)
consumer_name (TEXT)

queue_name (TEXT)
consumer_name (TEXT)
lag (INTERVAL)
last_seen (INTERVAL)
last_tick (BIGINT)
current_batch (BIGINT)
next_tick (BIGINT)
pending_events (BIGINT)

Получение информации о состоянии потребителя: отставание, активность и очередность.

Параметры функции:

  • queue_name — имя очереди;
  • consumer_name — имя потребителя;
  • lag — отставание потребителя от текущей позиции (временной интервал);
  • last_seen — время, прошедшее с последней активности потребителя;
  • last_tick — последняя обработанная контрольная точка;
  • current_batch — идентификатор текущего сеанса сообщений;
  • next_tick — ожидаемая следующая контрольная точка;
  • pending_events — количество ожидающих обработки сообщений

pgq.version

-

-

Получение версии расширения pgq.

pgq.get_batch_info

batch_id (BIGINT)

queue_name (TEXT)
consumer_name (TEXT)
batch_start (TIMESTAMPTZ)
batch_end (TIMESTAMPTZ)
prev_tick_id (BIGINT)
tick_id (BIGINT)
lag (INTERVAL)
seq_start (BIGINT)
seq_end (BIGINT)

Получение статистической информации о сеансе сообщений.

Параметры функции:

  • batch_id — идентификатор сеанса сообщений;
  • queue_name — имя очереди;
  • consumer_name — имя потребителя; batch_start — начало сеанса; batch_end — конец сеанса; prev_tick_id — предыдущий идентификатор контрольной точки; tick_id — текущий идентификатор контрольной точки;
    lag — задержка обработки;
    seq_start — начальное значение последовательности; seq_end — конечное значение последовательности

pgq.batch_event_sql

batch_id (BIGINT)

-

Формирование SQL-запроса для извлечения сообщений из сеанса.

Параметры функции:

  • batch_id — идентификатор сеанса сообщений

pgq.batch_event_tables

batch_id (BIGINT)

-

Получение списка таблиц, где могут находиться сообщения сеанса.

Параметры функции:

  • batch_id — идентификатор сеанса сообщений

pgq.event_retry_raw

queue (TEXT)
consumer (TEXT)
retry_after (TIMESTAMPTZ)
ev_id (BIGINT)
ev_time (TIMESTAMPTZ)
ev_retry (INT)
ev_type (TEXT)
ev_data (TEXT)
ev_extra1 (TEXT)
ev_extra2 (TEXT)
ev_extra3 (TEXT)
ev_extra4 (TEXT)

-

Интерфейс ручного управления сообщениями.

Параметры функции:

  • queue — имя очереди;
  • consumer — имя потребителя;
  • retry_after — момент времени, когда повторно попытаться обработать событие;
  • ev_id — идентификатор события;
  • ev_time — исходное время события;
  • ev_retry — количество предыдущих попыток обработки;
  • ev_type — тип события;
  • ev_data — данные события;
  • ev_extra1..ev_extra4 — дополнительная информация

pgq.find_tick_helper

queue_id (INT4)
prev_tick_id (BIGINT)
prev_tick_time (TIMESTAMPTZ)
prev_tick_seq (BIGINT)
min_count (BIGINT)
min_interval (INTERVAL)

next_tick_id (BIGINT)
next_tick_time (TIMESTAMPTZ)
next_tick_seq (BIGINT)

Вспомогательная функция для расчета следующей позиции обработки сообщений.

Параметры функции:

  • queue_id — идентификатор очереди;
  • prev_tick_id — предыдущий идентификатор контрольной точки;
  • prev_tick_time — время предыдущей контрольной точки;
  • prev_tick_seq — последовательность предыдущей контрольной точки
  • min_count — минимальное количество событий в сеансе сообщений;
  • min_interval — минимальный интервал между точками.
  • next_tick_id— следующий идентификатор контрольной точки;
  • next_tick_time — время следующей контрольной точки;
  • next_tick_seq — последовательность следующей контрольной точки

pgq.ticker

queue_name (TEXT)
tick_id (BIGINT)
orig_timestamp (TIMESTAMPTZ)
event_seq (BIGINT)

-

Регистрация контрольной точки обработки сообщений. При использовании внешнего сервиса требуется установить параметр queue_external_ticker.

Переменные функции:

  • queue_name — имя очереди;
  • tick_id — текущий идентификатор контрольной точки;
  • orig_timestamp — исходное время;
  • event_seq — текущая последовательность событий

pgq.maint_retry_events

-

-

Повторная постановка в очередь помеченных сообщений. Выполняется частично и должна вызываться до получения кода возврата 0.

pgq.maint_rotate_tables_step1

queue_name (TEXT)

-

Первый шаг обслуживания: очистка таблиц очереди, не содержащих необработанных сообщений. Выполняется с блокировкой.

Параметры функции:

  • queue_name — имя очереди

pgq.maint_rotate_tables_step2

-

-

Второй шаг обслуживания: обновление информации об очереди.

pgq.maint_tables_to_vacuum

-

-

Формирование списка таблиц очереди, подлежащих вакуумированию.

pgq.maint_operations

-

func_name (TEXT)
func_arg (TEXT)

Формирование списка регламентных операций для обслуживания очереди.

Параметры функции:

  • func_name — имя операции;
  • func_arg — аргумент операции

pgq.grant_perms

queue_name (TEXT)

-

Назначение прав доступа на таблицы очереди для роли public.

Параметры функции:

  • queue_name — имя очереди

pgq.tune_storage

queue_name (TEXT)

-

Настройка параметров хранения для таблиц очереди: fillfactor, autovacuum.

Параметры функции:

  • queue_name — имя очереди

pgq.force_tick

queue_name (TEXT)

-

Принудительная регистрация контрольной точки обработки сообщений.

Параметры функции:

  • queue_name — имя очереди

pgq.seq_getval

seq_name (TEXT)

-

Аналог getval() для получения значения последовательности (для совместимости).

Параметры функции:

  • seq_name — имя последовательности

pgq.seq_setval

seq_name (TEXT)
new_value (BIGINT)

-

Аналог setval() для установки значения последовательности.

Параметры функции:

  • seq_name — имя последовательности;
  • new_value — новое значение

pgq.quote_fqname

name (TEXT)

-

Форматирование полного имени объекта схемы и таблицы (для совместимости).

Параметры функции:

  • name — имя объекта (схема,таблица)

Триггеры

Одним из ключевых механизмов взаимодействия таблиц с очередями являются триггерные функции, автоматически добавляющие события в очередь при изменении данных в таблицах.

Триггерные функции в pgq служат для регистрации изменений в таблицах (INSERT, UPDATE, DELETE) и формирования событий, которые затем обрабатываются через очередь.

Перечень триггерных функций расширения pgq, используемых для работы с очередями сообщений:

Триггерные функции pgq

Триггер

Примеры использования

Описание

pgq.jsontriga()

-- AFTER TRIGGER
CREATE TRIGGER trga_ins_queue AFTER INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.jsontriga('queue_name');

-- BEFORE TRIGGER
TRIGGER trgb_ins_queue BEFORE INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.jsontriga('queue_name','SKIP');

Добавление сообщений в очередь, имя которой задается первым аргументом триггерной функции. Сообщение кодируется в формате jsonb

pgq.logutriga()

-- AFTER TRIGGER
CREATE TRIGGER trga_ins_queue AFTER INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('queue_name');

-- BEFORE TRIGGER
CREATE TRIGGER trgb_ins_queue BEFORE INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('queue_name','SKIP');

Добавление сообщений в очередь, имя которой задается первым аргументом триггерной функции. Сообщение кодируется в формате url-encoded (text)

pgq.sqltriga()

-- AFTER TRIGGER
CREATE TRIGGER trga_ins_queue AFTER INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.sqltriga('queue_name');

-- BEFORE TRIGGER
CREATE TRIGGER trgb_ins_queue BEFORE INSERT OR UPDATE ON triggered_table
FOR EACH ROW EXECUTE PROCEDURE pgq.sqltriga('queue_name','SKIP');

Добавление сообщений в очередь, заданную первым параметром триггерной функции. Сообщение кодируется в формате форматированного SQL (text)

Триггеры pgq можно настраивать с помощью аргументов, передаваемых в объявлении триггера:

Аргументы триггерных функций

Аргумент

Назначение

SKIP

Пропуск выполнения операции. Поведение триггерной функции для BEFORE trigger

ignore=col1,...

Исключение указанных полей из JSON

pkey=col1,...

Явное указание полей, содержащих primary key

backup

Добавление предыдущего состояния строки (OLD.*) в поле ev_extra2

colname=EXPR

Переопределение значения одного из полей сообщения (ev_type, ev_data, ev_extra1–ev_extra4) на выражение EXPR

when=EXPR

Условие фильтрации: сообщение добавляется в очередь только если EXPR возвращает true

Сообщение, добавляемое в очередь с помощью триггерных функций имеет следующую структуру:

  • ev_type — тип операции: I (INSERT), U (UPDATE), D (DELETE), с указанием первичного ключа через двоеточие :;
  • ev_data — содержит значения полей таблицы в формате JSON;
  • ev_extra1 — мя таблицы, из которой поступило сообщение;
  • ev_extra2 — (опционально) значения полей до изменения (при наличии OLD.*, используется с аргументом backup).

Таблицы

Расширение pgq сопровождается рядом системных таблиц, которые обеспечивают работу этой механики:

Системные таблицы pgq

Таблица

Структура

Описание

pgq.consumer

CREATE TABLE pgq.consumer (
co_id serial4 NOT NULL, -- Consumer ID
co_name text NOT NULL, -- Consumer Name
CONSTRAINT consumer_name_uq UNIQUE (co_name),
CONSTRAINT consumer_pkey PRIMARY KEY (co_id)
);

Список зарегистрированных потребителей очередей (consumer)

pgq.event_template

Cтруктура таблицы
CREATE TABLE pgq.event_template (
ev_id int8 NOT NULL, -- Event ID
ev_time timestamptz NOT NULL, -- Event Timestamp
ev_txid int8 NOT NULL DEFAULT txid_current(), -- Event XID
ev_owner int4 NULL, -- Event owner (Queue ID)
ev_retry int4 NULL, -- Event ID (retried)
ev_type text NULL, -- Event data
ev_data text NULL, -- Event data
ev_extra1 text NULL, -- Event extra data
ev_extra2 text NULL, -- Event extra data
ev_extra3 text NULL, -- Event extra data
ev_extra4 text NULL -- Event extra data
);

Шаблон очереди сообщений. При создании очереди на его основе создаются таблицы событий. По умолчанию — 3 наследуемые таблицы.

pgq.queue

Cтруктура таблицы
CREATE TABLE pgq.queue (
queue_id serial4 NOT NULL, -- queue id for internal usage
queue_name text NOT NULL, -- queue name visible outside
queue_ntables int4 NOT NULL DEFAULT 3, -- how many data tables the queue has
queue_cur_table int4 NOT NULL DEFAULT 0, -- which data table is currently active
queue_rotation_period interval NOT NULL DEFAULT '02:00:00'::interval, -- period for data table rotation
queue_switch_step1 int8 NOT NULL DEFAULT txid_current(), -- tx when rotation happened
queue_switch_step2 int8 NULL DEFAULT txid_current(), -- tx after rotation was committed
queue_switch_time timestamptz NOT NULL DEFAULT now(), -- time when switch happened
queue_external_ticker bool NOT NULL DEFAULT false, -- ticks come from some external sources
queue_disable_insert bool NOT NULL DEFAULT false, -- disallow pgq.insert_event()
queue_ticker_paused bool NOT NULL DEFAULT false, -- Is ticker paused
queue_ticker_max_count int4 NOT NULL DEFAULT 500, -- batch should not contain more events
queue_ticker_max_lag interval NOT NULL DEFAULT '00:00:03'::interval, -- events should not age more
queue_ticker_idle_period interval NOT NULL DEFAULT '00:01:00'::interval, -- how often to tick when no events happen
queue_per_tx_limit int4 NULL, -- Max number of events single TX can insert
queue_data_pfx text NOT NULL, -- prefix for data table names
queue_event_seq text NOT NULL, -- sequence for event id's
queue_tick_seq text NOT NULL, -- sequence for tick id's
queue_extra_maint _text NULL, -- array of functon names to call during maintenance
CONSTRAINT queue_name_uq UNIQUE (queue_name),
CONSTRAINT queue_pkey PRIMARY KEY (queue_id)
);

Таблица зарегистрированных очередей сообщений

pgq.subscription

Cтруктура таблицы
CREATE TABLE pgq."subscription" (
sub_id serial4 NOT NULL, -- Subscription ID
sub_queue int4 NOT NULL, -- Queue ID
sub_consumer int4 NOT NULL, -- Consumer ID
sub_last_tick int8 NULL, -- Last Tick ID
sub_active timestamptz NOT NULL DEFAULT now(), -- Last active timestamp
sub_batch int8 NULL, -- Last Batch ID
sub_next_tick int8 NULL, -- Next Tick ID
CONSTRAINT subscription_batch_idx UNIQUE (sub_batch),
CONSTRAINT subscription_pkey PRIMARY KEY (sub_queue, sub_consumer),
CONSTRAINT sub_consumer_fkey FOREIGN KEY (sub_consumer) REFERENCES pgq.consumer(co_id),
CONSTRAINT sub_queue_fkey FOREIGN KEY (sub_queue) REFERENCES pgq.queue(queue_id)
);

Таблица, содержащая информацию о получателях потребителей на очереди

pgq.tick

Cтруктура таблицы
CREATE TABLE pgq.tick (
tick_queue int4 NOT NULL, -- Queue ID
tick_id int8 NOT NULL, -- Tick ID
tick_time timestamptz NOT NULL DEFAULT now(), -- Tick timestamp
tick_snapshot txid_snapshot NOT NULL DEFAULT txid_current_snapshot(), -- Tick XID
tick_event_seq int8 NOT NULL, -- Tick event
CONSTRAINT tick_pkey PRIMARY KEY (tick_queue, tick_id),
CONSTRAINT tick_queue_fkey FOREIGN KEY (tick_queue) REFERENCES pgq.queue(queue_id)
);

Таблица, фиксирующая контрольные точки времени — метки для синхронизации обработки событий

Роли

В расширении pgq используются определенные роли, обеспечивающие контроль доступа к очередям, событиям и процессу их обработки:

Системные таблицы pgq

Роль

Описание

pgq_admin

Администратор расширения. Член ролей pgq_reader, pgq_writer. Позволяет создавать и управлять очередями сообщений

pgq_reader

Роль потребителя (consumer). Позволяет создавать сеанс сообщений, читать и управлять сообщением

pgq_writer

Роль отправителя очереди (provider). Позволяет помещать сообщения в очередь, в том числе с помощью триггеров

Настройка менеджера очередей pangolin-pgqd

Менеджер очереди является системным сервисом, предоставляющим элементы синхронизации (tick).

При запуске сервиса происходит:

  • подключение к сконфигурированной сущности;
  • поиск установленного расширения по всем базам данным, доступным для подключения;
  • вызов функции pgq.ticker() c заданной периодичностью;
  • вызов функций pgq.maint_rotate_tables_step1(), pgq.maint_rotate_tables_step2() c заданной периодичностью;
  • вызов функции pgq.maint_retry_events() с заданной периодичностью.

Изменение конфигурации pangolin-pgqd производится с использованием конфигурационного файла pangolin-pgqd.ini:

[pgqd]
# where to log
logfile = ~/log/pangolin-pgqd.log

# pidfile
pidfile = ~/pangolin-pgqd/pangolin-pgqd.pid

## optional parameters ##

# libpq connect string without dbname=
#base_connstr = host=<IP-адрес> port=<Порт> user=<Имя пользователя> password=<Пароль>

# startup db to query other databases
#initial_database = template1

# limit ticker to specific databases
#database_list =

# log into syslog
syslog = 1
syslog_ident = pgqd

## optional timeouts ##

# how often to check for new databases
check_period = 60

# how often to flush retry queue
retry_period = 30

# how often to do maintentance
#maint_period = 120

# how often to run ticker
ticker_period = 1

Описание конфигурационных параметров:

Конфигурационные параметры

Конфигурационный параметр

Значение по умолчанию

Описание

logfile

~/log/pangolin-pgqd.log

Путь к лог-файлу

pidfile

~/pangolin-pgqd/pangolin-pgqd.pid

Путь к pid-файлу

base_connstr

-

Строка подключения к сущности (libpq syntax). Не должна включать имя базы данных

initial_database

template1

База данных для автоматического подключения (при отсутствии параметра database_list)

database_list

-

Список баз данных для подключения. Если не задан — производится автоматический поиск

syslog

1

Признак логирования действий в syslog

syslog_ident

pgqd

Идентификатор syslog

check_period

60

Периодичность поиска новых баз данных (в случае автоматического поиска), секунды

retry_period

30

Периодичность поиска сообщений, повторно добавляемых в очередь, секунды

maint_period

120

Периодичность проведения регламентных операций, секунды

ticker_period

1

Периодичность синхронизации очередей, секунды

Доступные параметры командной строки при использовании расширения:

./pgqd --help
usage: pgq-ticker [switches] config.file
Switches:
-v Increase verbosity
-q No output to console
-d Daemonize
-h Show help
-V Show version
--ini Show sample config file
-s Stop - send SIGINT to running process
-k Kill - send SIGTERM to running process
-r Reload - send SIGHUP to running process

Где:

  • -v — повышение уровня подробности вывода;
  • -q — отключение вывода сообщений в консоль (режим тишины);
  • -d — запуск в фоновом режиме;
  • -h — отображение справки по использованию;
  • -V — отображение версии;
  • --ini — отображение примера конфигурационного файла;
  • -s — остановка выполнения — отправка сигнала SIGINT запущенному процессу;
  • -k — завершение работы — отправка сигнала SIGTERM запущенному процессу;
  • -r — перезагрузка конфигурации — отправка сигнала SIGHUP запущенному процессу.

Для окончания конфигурирования сервиса требуется его перезагрузка:

sudo systemctl reload pangolin-pgqd

Настройка расширения pgq

  1. Настройте количество таблиц очереди перед ее созданием, выполнив следующий запрос:

    SET ROLE tuz_pgq_admin;
    -- <n> - number of desired tables per queue
    ALTER TABLE pgq.queue ALTER COLUMN queue_ntables SET DEFAULT <n>;
  2. Настройте остальные параметры очереди:

    SET ROLE tuz_pgq_admin;
    -- <queue_param> one of:
    -- queue_ticker_max_count
    -- queue_ticker_max_lag
    -- queue_ticker_idle_period
    -- queue_ticker_paused
    -- queue_rotation_period
    -- queue_external_ticker
    SELECT * FROM pgq.set_queue_config('<queue_name>','<queue_param>','<param_value>');
  3. Проверьте наполнение очереди (количество элементов, скорость наполнения, работоспособность расширения):

    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.get_queue_info();
  4. Проверьте количество сообщений, ожидающих обработки:

    SET ROLE tuz_pgq_admin;
    SELECT * FROM get_consumer_info(['<queue_name>','<consumer_name>']);
  5. Настройте регламентные операции для таблиц очередей - добавьте дополнительные обработки для таблиц очередей:

    SET ROLE tuz_pgq_admin;
    UPDATE pgq.queue SET queue_extra_maint = ARRAY['<func1>','<func2>'] WHERE queue_name='<queue_name>';
Примечание

Изменение типа данных для сообщений (на json/b или xml) не рекомендуется. Основная функция для добавления сообщений pgq.insert_event_raw написана на языке C, скомпилирована в расширение и не предусматривает изменение состава полей.

Изменение позиции повторной отправки сообщения (event retry) не предусмотрена. Повторно отправляемое сообщение помещается в конец очереди.

Повторная приоритизация возможна созданием дополнительной очереди настройки приоритетов на уровне бизнес-логики.

Проверка работоспособности

При штатной работе менеджера очередей:

  • наблюдается увеличение значений поля last_tick_id;
  • проводятся периодические подключения к БД, что может приводить к росту объема логов. После установки функциональности рекомендуется пересмотреть настройки аудита для пользователя, под которым будет производиться работа менеджера очередей.
Примечание

При нештатной работе менеджера очередей наблюдаются одинаковые значение поля last_tick_id между итерациями, а так же возрастание значений в поле ticker_lag.

Использование модуля

Обозначения переменных

Далее в примерах имеются следующие обозначения:

  • <name_queue> - наименование очереди;
  • <type_event> - тип события;
  • <event_information> - информация о событии;
  • <name_consumer> - наименование потребителя;
  • <id_event> - идентификатор события;
  • <id_batch> - идентификатор последовательности;
  • <second_until_next_attempt> - количество секунд до повторной попытки выполнить действие.

Установка и проверка расширения pgq в СУБД Pangolin

  1. Создайте базу данных test_pgq и подключитесь к ней:

    CREATE DATABASE test_pgq;
    \c test_pgq
  2. Создайте расширение pgq:

    CREATE EXTENSION pgq;
  3. Проверьте наличие базы данных test_pgq и расширения pgq на ведущем сервере:

    \dx pgq

    Результатом выполнения — вывод версии расширения, например:

                 List of installed extensions
    ┌──────┬─────────┬────────────┬──────────────────────────────┐
    │ Name │ Version │ Schema │ Description │
    ├──────┼─────────┼────────────┼──────────────────────────────┤
    │ pgq │ 3.5.1 │ pg_catalog │ Generic queue for PostgreSQL │
    └──────┴─────────┴────────────┴──────────────────────────────┘
    (1 row)

Установка и запуск менеджера очередей pangolin-pgqd

  1. Откройте конфигурационный файл pangolin-pgqd.ini, убедитесь в наличии параметра logfile и закомментированного параметра database_list.

  2. Перезапустите менеджер соединений pangolin-pgqd, убедитесь в его запуске с помощью команд:

    sudo systemctl restart pangolin-pgqd;
    sudo systemctl status pangolin-pgqd;
  3. Проверьте автоматическое определение БД со стороны менеджера подключений pangolin-pgqd на ведущем сервере:

    sudo journalctl -n10 --no-pager -eu pangolin-pgqd

    Результат выполнения команды содержит строку LOG [test_pgq]: pgq version ok: {component_version}.

  4. Проверьте автоматическое определение БД со стороны менеджера подключений pangolin-pgqd на ведомом сервере:

    sudo journalctl -n10 --no-pager -eu pangolin-pgqd

    Результат выполнения команды содержит строки:

    ERROR test_pgq: ERROR:  cannot execute LOCK TABLE during recovery
    ERROR: cannot execute pg_current_xact_id() during recovery

Работа с очередью от роли потребителя

  1. Создание очереди:

    SELECT * FROM pgq.create_queue(<name_queue> TEXT);
  2. Добавление в очередь сообщения:

    SELECT * FROM pgq.insert_event(<name_queue> TEXT, <type_event> TEXT, <event_information> TEXT);
  3. Создание потребителя (consumer):

    SELECT * FROM pgq.register_consumer(<name_queue> TEXT, <name_consumer> TEXT);
    Внимание!

    Потребитель будет получать события только после собственной регистрации, поэтому возможность создавать события требует первичной «регистрации» потребителя.

  4. Получение идентификатора (ID) блока последовательных сообщений в очереди:

    SELECT * FROM pgq.next_batch(<name_queue> TEXT, <name_consumer> TEXT);
  5. Получение всех событий блока последовательных сообщений:

    SELECT * FROM pgq.get_batch_events(<id_batch> BIGINT);
    Примечание

    Возвращенное значение может быть пустым.

  6. Повторное помещение события в очередь:

    SELECT * FROM pgq.event_retry(<id_batch> BIGINT, <id_event> BIGINT, <second_until_next_attempt> INTEGER);

Работа с очередью от роли отправителя

  1. На ведущем сервере создайте пользователей для работы с ролевой моделью pgq:

    SET ROLE db_admin;
    \c test_pgq
    CREATE USER tuz_pgq_admin IN ROLE pgq_admin;
    CREATE USER tuz_pgq_reader IN ROLE pgq_reader;
    CREATE USER tuz_pgq_writer IN ROLE pgq_writer;
  2. Создайте очередь сообщений test_q:

    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.create_queue('test_q');
  3. Зарегистрируйте получателя сообщений:

    SET ROLE tuz_pgq_reader;
    SELECT pgq.register_consumer('test_q','test_consumer');
  4. Заполните очередь 10 раз наборами из 10000 сообщений:

    SET ROLE tuz_pgq_writer;
    DO
    $$
    DECLARE
    i int4;
    BEGIN
    FOR i IN 1..10 LOOP
    PERFORM pgq.insert_event('test_q',id::text,format('MSG %1$s',id)) from generate_series(1,1000) id;
    END LOOP;
    END
    $$;
    \watch 0
  5. Получите статистическую информацию по очереди и потребителям:

    SET ROLE tuz_pgq_reader;
    SELECT * FROM pgq.get_queue_info()\gx
    SELECT * FROM pgq.get_consumer_info()\gx

    Результат выполнения команды — получение таблиц вида:

    SELECT * FROM pgq.get_queue_info()\gx
    ┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
    │ queue_name │ test_q │
    │ queue_ntables │ 3
    │ queue_cur_table │ 0
    │ queue_rotation_period │ 02:00:00
    │ queue_switch_time │ 2023-11-16 12:57:13.566682+03
    │ queue_external_ticker │ f │
    │ queue_ticker_paused │ f │
    │ queue_ticker_max_count │ 500
    │ queue_ticker_max_lag │ 00:00:03
    │ queue_ticker_idle_period │ 00:01:00
    │ ticker_lag │ 00:00:04.742343
    │ ev_per_sec │ 770.3491295563064
    │ ev_new │ 0
    │ last_tick_id │ 53
    └──────────────────────────┴───────────────────────────────┘
    SELECT * FROM pgq.get_consumer_info()\gx
    ┌─[ RECORD 1 ]───┬─────────────────┐
    │ queue_name │ test_q │
    │ consumer_name │ test_consumer │
    │ lag │ 00:02:09.63673
    │ last_seen │ 00:01:56.731471
    │ last_tick │ 36
    │ current_batch │ NULL
    │ next_tick │ NULL
    │ pending_events │ 125050
    └────────────────┴─────────────────┘
    Примечание

    Значения контрольных точек времени (tick) могут изменяться.

  6. Пример автоматизированного конвейера получения и обработки сеансов:

    SET ROLE tuz_pgq_reader;
    DO
    $$
    DECLARE
    v_batch_id int8;
    v_event_cnt int8;
    BEGIN
    SELECT batch_id FROM pgq.next_batch_info('test_q','test_consumer') INTO v_batch_id;
    IF v_batch_id ISNULL THEN
    RETURN;
    END IF;
    SELECT count(*) FROM pgq.get_batch_events(v_batch_id) INTO v_event_cnt;
    RAISE NOTICE 'Got % events from batch %', v_event_cnt::text,v_batch_id::TEXT;
    -- re-queue random 10 events at 5 secs;
    IF v_event_cnt > 1 THEN
    PERFORM pgq.event_retry(v_batch_id,ev_id,5) FROM pgq.get_batch_events(v_batch_id) ORDER BY random() LIMIT 10;
    END IF;
    PERFORM pgq.finish_batch(v_batch_id);
    END;
    $$;
    \watch 2

    При успешном результате наблюдается последовательное увеличение batch_id и периодическое получение сообщений до исчерпания очереди.

  7. Получите статистическую информацию по очереди и потребителям:

    SET ROLE tuz_pgq_reader;
    SELECT * FROM pgq.get_queue_info()\gx
    SELECT * FROM pgq.get_consumer_info()\gx

    Результат выполнения команды — получение таблиц вида:

    SELECT * FROM pgq.get_queue_info()\gx
    ┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
    │ queue_name │ test_q │
    │ queue_ntables │ 3
    │ queue_cur_table │ 0
    │ queue_rotation_period │ 02:00:00
    │ queue_switch_time │ 2023-11-16 12:57:13.566682+03
    │ queue_external_ticker │ f │
    │ queue_ticker_paused │ f │
    │ queue_ticker_max_count │ 500
    │ queue_ticker_max_lag │ 00:00:03
    │ queue_ticker_idle_period │ 00:01:00
    │ ticker_lag │ 00:00:28.175869
    │ ev_per_sec │ 568.1271702331466
    │ ev_new │ 0
    │ last_tick_id │ 63
    └──────────────────────────┴───────────────────────────────┘
    SELECT * FROM pgq.get_consumer_info()\gx
    ┌─[ RECORD 1 ]───┬─────────────────┐
    │ queue_name │ test_q │
    │ consumer_name │ test_consumer │
    │ lag │ 00:01:16.866497
    │ last_seen │ 00:01:16.028043
    │ last_tick │ 61
    │ current_batch │ NULL
    │ next_tick │ NULL
    │ pending_events │ 20
    └────────────────┴─────────────────┘

    При успешном результате наблюдается увеличение last_tick_id, уменьшение размера очереди (pending_events).

  8. Удалите очередь:

    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.drop_queue(x_queue_name =>'test_q', x_force => TRUE);

    Результат выполнения команды — получение таблиц вида:

    ┌────────────┐
    │ drop_queue │
    ├────────────┤
    1
    └────────────┘

Диагностика ошибок

В этом разделе описана информация, с помощью которой можно диагностировать неполадки.

Менеджер очередей

Проверка менеджера очередей на уровне SQL выполняется аналогично контролю наполнения очереди. В нормальной ситуации:

  • поле last_tick_id регулярно увеличивается — это признак активности менеджера очередей и их обработки;
  • поле ticker_lag остается в пределах допустимого времени задержки.

При сбоях:

  • значение last_tick_id не изменяется между итерациями;
  • значение ticker_lag постепенно растет, что указывает на отсутствие новых событий от менеджера очередей.
Примечание

При нормальной работе менеджера очередей наблюдаются периодические подключения к БД. Это может вызвать рост объема логов при включенном аудите. Рекомендуется пересмотреть параметры аудита для роли, от имени которой работает менеджер очередей.

Очереди сообщений

Иногда требуется временно приостановить обработку очереди, например, при выполнении обслуживания или отладки. Это можно сделать вручную через вызовы SQL-функций.

Приостановление очереди на прием:

SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','true');

Возобновление работы очереди сообщений:

SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','false');

Ссылки на документацию разработчика

Функции расширения pgq: https://pgq.github.io/extension/pgq/files/external-sql.html.