pgq_coop. Организация кооперативных очередей сообщений
Версия: 3.4.
В исходном дистрибутиве установлено по умолчанию: нет.
Связанные компоненты:
pgq
.Схема размещения:
pgq_coop
.
Описание
Опциональное расширение, реализованное на языке plpgsql
и предназначенное для организации кооперативных очередей. Для одного генератора сообщений могут присутствовать несколько получателей.
«subconsumer» — актор очереди сообщений, функционально являющийся частью потребителя consumer, далее субпотребитель.
Функции
Расширение предоставляет следующие функции:
Функции расширения pgq_coop
Функция | Входные параметры | Описание |
pgq_coop.finish_batch | batch_id (INT8 ) | Завершение обработки сеанса сообщений. Решение принимается солидарно для всех субпотребителей. Параметры функции: - batch_id — идентификатор сеанса сообщений |
pgq_coop.next_batch | queue_name (TEXT ) consumer_name (TEXT ) subconsumer_name (TEXT ) dead_interval (INTERVAL ) | Получение информации о сеансе сообщений для субпотребителя. Аналог функции pgq.next_batch . Параметры функции: - batch_id — идентификатор сеанса сообщений; - queue_name — имя очереди; - consumer_name — имя потребителя; - subconsumer_name — имя субпотребителя; - dead_interval — время, после которого сеанс считается «зависшим» |
pgq_coop.next_batch_custom | queue_name (TEXT ) consumer_name (TEXT ) subconsumer_name (TEXT ) min_lag (INTERVAL ) min_count (INT4 ) min_interval (INTERVAL ) dead_interval (INTERVAL ) | Расширенное получение информации о сеансе сообщений. Аналог функции pgq.next_batch_custom . Параметры функции: - queue_name — имя очереди; - consumer_name — имя потребителя; - subconsumer_name — имя субпотребителя; - min_lag — минимальная задержка для событий; - min_count — минимальное количество событий в сеансе сообщений; - min_interval — минимальный интервал между событиями; - dead_interval — время, после которого сеанс считается «зависшим» |
pgq_coop.register_subconsumer | queue_name (TEXT ) consumer_name (TEXT ) subconsumer_name (TEXT ) | Регистрация субпотребителя в очереди. Параметры функции: - queue_name — имя очереди; - consumer_name — имя потребителя; - subconsumer_name — имя субпотребителя |
pgq_coop.unregister_subconsumer | queue_name (TEXT ) consumer_name (TEXT ) subconsumer_name (TEXT ) batch_handling (INT ) | Отмена регистрации субпотребителя в очереди. Параметры функции: - queue_name — имя очереди; - consumer_name — имя потребителя; - subconsumer_name — имя субпотребителя; - batch_handling — код, указывающий, как завершать текущую обработку: - 0 — игнорировать незавершенные сеансы; - 1 — завершить сеанс автоматически; - 2— завершить с ошибкой |
pgq_coop.version | - | Версия расширения. |
Доработка
Доработка не проводилась.
Ограничения
Требуется предварительная установка расширения pgq
.
Установка
- SberLinux, РЕД ОС, CentOS
- Red Hat Enterprise Linux
- Astra Linux
- Альт СП
sudo dnf install pangolin-pgq-coop-{version_component}-{OS}.x86_64.rpm
sudo yum install pangolin-pgq-coop-{version_component}-{OS}.x86_64.rpm
sudo apt install pangolin-pgq-coop-{version_component}_amd64.deb
sudo apt-get install pangolin-pgq-coop-{version_component}-{OS}.x86_64.rpm
Пример заполненной команды:
cd distributive
sudo apt-get -y install pangolin-pgq-coop-3.4-rhel8.7.x86_64.rpm
Модуль считается «доверенным», поэтому его могут устанавливать пользователи, имеющие право CREATE
в текущей базе данных:
CREATE EXTENSION pgq_coop;
Настройка
Настройка не требуется.
Использование модуля
Работа с кооперативной очередью
-
Создайте расширение
pgq_coop
:SET ROLE db_admin;
CREATE EXTENSION pgq_coop;
\dx pgq_coopРезультатом выполнения — вывод версии расширения, например:
List of installed extensions
┌──────────┬─────────┬────────────┬─────────────────────────────────────┐
│ Name │ Version │ Schema │ Description │
├──────────┼─────────┼────────────┼─────────────────────────────────────┤
│ pgq_coop │ 3.4 │ pg_catalog │ Cooperative queue consuming for PgQ │
└──────────┴─────────┴────────────┴─────────────────────────────────────┘ (1 row) -
Cоздайте очередь
coop_q
:SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.create_queue('coop_q'); -
Создайте для очереди
coop_q
потребителя и два субпотребителя, связанных с ним (subconsumer):SET ROLE tuz_pgq_reader;
SELECT pgq.register_consumer('coop_q','test_consumer');
SELECT pgq_coop.register_subconsumer('coop_q','test_consumer','test_subconsumer1');
SELECT pgq_coop.register_subconsumer('coop_q','test_consumer','test_subconsumer2'); -
Заполните очередь десятью блоками по 10000 сообщений:
SET ROLE tuz_pgq_writer;
DO
$$
DECLARE
i int4;
BEGIN
FOR i IN 1..10 LOOP
PERFORM pgq.insert_event('coop_q',id::text,format('MSG %1$s',id)) from generate_series(1,1000) id;
END LOOP;
END
$$;
\watch 0 -
Получите статистическую информацию по очереди и потребителям:
SET ROLE tuz_pgq_reader;
SELECT * FROM pgq.get_queue_info()\gx
SELECT * FROM pgq.get_consumer_info()\gxРезультат выполнения команды — получение таблиц вида:
SELECT * FROM pgq.get_queue_info()\gx
┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
│ queue_name │ coop_q │
│ queue_ntables │ 3 │
│ queue_cur_table │ 0 │
│ queue_rotation_period │ 02:00:00 │
│ queue_switch_time │ 2023-11-16 13:08:57.838897+03 │
│ queue_external_ticker │ f │
│ queue_ticker_paused │ f │
│ queue_ticker_max_count │ 500 │
│ queue_ticker_max_lag │ 00:00:03 │
│ queue_ticker_idle_period │ 00:01:00 │
│ ticker_lag │ 00:00:04.829513 │
│ ev_per_sec │ 2041.7830216193786 │
│ ev_new │ 0 │
│ last_tick_id │ 20 │
└──────────────────────────┴───────────────────────────────┘
SELECT * FROM pgq.get_consumer_info()\gx
┌─[ RECORD 1 ]───┬─────────────────────────────────┐
│ queue_name │ coop_q │
│ consumer_name │ test_consumer │
│ lag │ 00:00:56.706897 │
│ last_seen │ 00:00:55.567714 │
│ last_tick │ 4 │
│ current_batch │ NULL │
│ next_tick │ NULL │
│ pending_events │ 119999 │
├─[ RECORD 2 ]───┼─────────────────────────────────┤
│ queue_name │ coop_q │
│ consumer_name │ test_consumer.test_subconsumer1 │
│ lag │ NULL │
│ last_seen │ 00:00:55.562226 │
│ last_tick │ NULL │
│ current_batch │ NULL │
│ next_tick │ NULL │
│ pending_events │ NULL │
├─[ RECORD 3 ]───┼─────────────────────────────────┤
│ queue_name │ coop_q │
│ consumer_name │ test_consumer.test_subconsumer2 │
│ lag │ NULL │
│ last_seen │ 00:00:54.789057 │
│ last_tick │ NULL │
│ current_batch │ NULL │
│ next_tick │ NULL │
│ pending_events │ NULL │
└────────────────┴─────────────────────────────────┘ -
Пример автоматизированного конвейра получения и обработки сеансов:
SET ROLE tuz_pgq_reader;
DO
$$
DECLARE
v_batch_id int8;
v_event_cnt int8;
BEGIN
-- subconsumer 1
SELECT next_batch FROM pgq_coop.next_batch('coop_q','test_consumer','test_subconsumer1') INTO v_batch_id;
IF v_batch_id ISNULLNOTNULL THEN
RETURN;
END IF;
SELECT count(*) FROM pgq.get_batch_events(v_batch_id) INTO v_event_cnt;
RAISE NOTICE 'Got % events from batch for subconsumer1 %', v_event_cnt::text,v_batch_id::TEXT;
PERFORM pgq_coop.finish_batch(v_batch_id);
END IF;
--subconsumer 2
SELECT next_batch FROM pgq_coop.next_batch('coop_q','test_consumer','test_subconsumer2') INTO v_batch_id;
IF v_batch_id ISNULLNOTNULL THEN
RETURN;
END IF;
SELECT count(*) FROM pgq.get_batch_events(v_batch_id) INTO v_event_cnt;
RAISE NOTICE 'Got % events from batch for subconsumer2 %', v_event_cnt::text,v_batch_id::TEXT;
PERFORM pgq_coop.finish_batch(v_batch_id);
END IF;
END;
$$;
\watch 1При успешном результате наблюдается последовательное увеличение
batch_id
и периодическое получение сообщений до исчерпания очереди. -
Получите статистическую информацию по очереди и потребителям:
SET ROLE tuz_pgq_reader;
SELECT * FROM pgq.get_queue_info()\gx
SELECT * FROM pgq.get_consumer_info()\gxРезультат выполнения команды — получение таблиц вида:
SELECT * FROM pgq.get_queue_info()\gx
┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
│ queue_name │ coop_q │
│ queue_ntables │ 3 │
│ queue_cur_table │ 0 │
│ queue_rotation_period │ 02:00:00 │
│ queue_switch_time │ 2023-11-15 14:18:03.430572+03 │
│ queue_external_ticker │ f │
│ queue_ticker_paused │ f │
│ queue_ticker_max_count │ 500 │
│ queue_ticker_max_lag │ 00:00:03 │
│ queue_ticker_idle_period │ 00:01:00 │
│ ticker_lag │ 00:00:58.33156 │
│ ev_per_sec │ 11.074713891632932 │
│ ev_new │ 0 │
│ last_tick_id │ 49 │
└──────────────────────────┴───────────────────────────────┘
SELECT * FROM pgq.get_consumer_info()\gx
┌─[ RECORD 1 ]───┬─────────────────────────────────┐
│ queue_name │ coop_q │
│ consumer_name │ test_consumer │
│ lag │ 00:04:46.626124 │
│ last_seen │ 00:00:51.636231 │
│ last_tick │ 46 │
│ current_batch │ NULL │
│ next_tick │ NULL │
│ pending_events │ 0 │
├─[ RECORD 2 ]───┼─────────────────────────────────┤
│ queue_name │ coop_q │
│ consumer_name │ test_consumer.test_subconsumer1 │
│ lag │ NULL │
│ last_seen │ 00:00:51.636231 │
│ last_tick │ NULL │
│ current_batch │ NULL │
│ next_tick │ NULL │
│ pending_events │ NULL │
├─[ RECORD 3 ]───┼─────────────────────────────────┤
│ queue_name │ coop_q │
│ consumer_name │ test_consumer.test_subconsumer2 │
│ lag │ NULL │
│ last_seen │ 00:00:51.636231 │
│ last_tick │ NULL │
│ current_batch │ NULL │
│ next_tick │ NULL │
│ pending_events │ NULL │
└────────────────┴─────────────────────────────────┘ -
Удалите очередь:
SET ROLE tuz_pgq_reader;
SELECT * FROM pgq.drop_queue(x_queue_name =>'coop_q', x_force => TRUE);
SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.drop_queue(x_queue_name =>'coop_q', x_force => TRUE);Результат выполнения команды — очередь удалена.
ПримечаниеПрав
tuz_pgq_reader
недостаточно для удаления очереди. Правtuz_pgq_admin
достаточно для удаления очереди.
Ссылки на документацию разработчика
Функции расширения pgq_coop
: https://github.com/pgq/pgq-coop.