Перейти к основному содержимому

pgq_coop. Организация кооперативных очередей сообщений

Версия: 3.4.

В исходном дистрибутиве установлено по умолчанию: нет.

Связанные компоненты: pgq.

Схема размещения: pgq_coop.

Описание

Опциональное расширение, реализованное на языке plpgsql и предназначенное для организации кооперативных очередей. Для одного генератора сообщений могут присутствовать несколько получателей.

«subconsumer» — актор очереди сообщений, функционально являющийся частью потребителя consumer, далее субпотребитель.

Функции

Расширение предоставляет следующие функции:

Функции расширения pgq_coop

Функция
Входные параметры
Описание
pgq_coop.finish_batch
batch_id (INT8)
Завершение обработки сеанса сообщений. Решение принимается солидарно для всех субпотребителей.

Параметры функции:

- batch_id — идентификатор сеанса сообщений
pgq_coop.next_batch
queue_name (TEXT)
consumer_name (TEXT)
subconsumer_name (TEXT)
dead_interval (INTERVAL)
Получение информации о сеансе сообщений для субпотребителя. Аналог функции pgq.next_batch.

Параметры функции:

- batch_id — идентификатор сеанса сообщений;
- queue_name — имя очереди;
- consumer_name — имя потребителя;
- subconsumer_name — имя субпотребителя;
- dead_interval — время, после которого сеанс считается «зависшим»
pgq_coop.next_batch_custom
queue_name (TEXT)
consumer_name (TEXT)
subconsumer_name (TEXT)
min_lag (INTERVAL)
min_count (INT4)
min_interval (INTERVAL)
dead_interval (INTERVAL)
Расширенное получение информации о сеансе сообщений. Аналог функции pgq.next_batch_custom.

Параметры функции:

- queue_name — имя очереди;
- consumer_name — имя потребителя;
- subconsumer_name — имя субпотребителя;
- min_lag — минимальная задержка для событий;
- min_count — минимальное количество событий в сеансе сообщений;
- min_interval — минимальный интервал между событиями;
- dead_interval — время, после которого сеанс считается «зависшим»
pgq_coop.register_subconsumer
queue_name (TEXT)
consumer_name (TEXT)
subconsumer_name (TEXT)
Регистрация субпотребителя в очереди.

Параметры функции:

- queue_name — имя очереди;
- consumer_name — имя потребителя;
- subconsumer_name — имя субпотребителя
pgq_coop.unregister_subconsumer
queue_name (TEXT)
consumer_name (TEXT)
subconsumer_name (TEXT)
batch_handling (INT)
Отмена регистрации субпотребителя в очереди.

Параметры функции:

- queue_name — имя очереди;
- consumer_name — имя потребителя;
- subconsumer_name — имя субпотребителя;
- batch_handling — код, указывающий, как завершать текущую обработку:

- 0 — игнорировать незавершенные сеансы;
- 1 — завершить сеанс автоматически;
- 2— завершить с ошибкой
pgq_coop.version
-
Версия расширения.

Доработка

Доработка не проводилась.

Ограничения

Требуется предварительная установка расширения pgq.

Установка

sudo dnf install pangolin-pgq-coop-{version_component}-{OS}.x86_64.rpm
Подсказка

Пример заполненной команды:

cd distributive

sudo apt-get -y install pangolin-pgq-coop-3.4-rhel8.7.x86_64.rpm

Модуль считается «доверенным», поэтому его могут устанавливать пользователи, имеющие право CREATE в текущей базе данных:

CREATE EXTENSION pgq_coop;

Настройка

Настройка не требуется.

Использование модуля

Работа с кооперативной очередью

  1. Создайте расширение pgq_coop:

    SET ROLE db_admin;
    CREATE EXTENSION pgq_coop;
    \dx pgq_coop

    Результатом выполнения — вывод версии расширения, например:

                                          List of installed extensions
    ┌──────────┬─────────┬────────────┬─────────────────────────────────────┐
    │ Name │ Version │ Schema │ Description │
    ├──────────┼─────────┼────────────┼─────────────────────────────────────┤
    │ pgq_coop │ 3.4 │ pg_catalog │ Cooperative queue consuming for PgQ │
    └──────────┴─────────┴────────────┴─────────────────────────────────────┘ (1 row)
  2. Cоздайте очередь coop_q:

    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.create_queue('coop_q');
  3. Создайте для очереди coop_q потребителя и два субпотребителя, связанных с ним (subconsumer):

    SET ROLE tuz_pgq_reader;
    SELECT pgq.register_consumer('coop_q','test_consumer');
    SELECT pgq_coop.register_subconsumer('coop_q','test_consumer','test_subconsumer1');
    SELECT pgq_coop.register_subconsumer('coop_q','test_consumer','test_subconsumer2');
  4. Заполните очередь десятью блоками по 10000 сообщений:

    SET ROLE tuz_pgq_writer;
    DO
    $$
    DECLARE
    i int4;
    BEGIN
    FOR i IN 1..10 LOOP
    PERFORM pgq.insert_event('coop_q',id::text,format('MSG %1$s',id)) from generate_series(1,1000) id;
    END LOOP;
    END
    $$;
    \watch 0
  5. Получите статистическую информацию по очереди и потребителям:

    SET ROLE tuz_pgq_reader;
    SELECT * FROM pgq.get_queue_info()\gx
    SELECT * FROM pgq.get_consumer_info()\gx

    Результат выполнения команды — получение таблиц вида:

    SELECT * FROM pgq.get_queue_info()\gx
    ┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
    │ queue_name │ coop_q │
    │ queue_ntables │ 3
    │ queue_cur_table │ 0
    │ queue_rotation_period │ 02:00:00
    │ queue_switch_time │ 2023-11-16 13:08:57.838897+03
    │ queue_external_ticker │ f │
    │ queue_ticker_paused │ f │
    │ queue_ticker_max_count │ 500
    │ queue_ticker_max_lag │ 00:00:03
    │ queue_ticker_idle_period │ 00:01:00
    │ ticker_lag │ 00:00:04.829513
    │ ev_per_sec │ 2041.7830216193786
    │ ev_new │ 0
    │ last_tick_id │ 20
    └──────────────────────────┴───────────────────────────────┘
    SELECT * FROM pgq.get_consumer_info()\gx
    ┌─[ RECORD 1 ]───┬─────────────────────────────────┐
    │ queue_name │ coop_q │
    │ consumer_name │ test_consumer │
    │ lag │ 00:00:56.706897
    │ last_seen │ 00:00:55.567714
    │ last_tick │ 4
    │ current_batch │ NULL
    │ next_tick │ NULL
    │ pending_events │ 119999
    ├─[ RECORD 2 ]───┼─────────────────────────────────┤
    │ queue_name │ coop_q │
    │ consumer_name │ test_consumer.test_subconsumer1 │
    │ lag │ NULL
    │ last_seen │ 00:00:55.562226
    │ last_tick │ NULL
    │ current_batch │ NULL
    │ next_tick │ NULL
    │ pending_events │ NULL
    ├─[ RECORD 3 ]───┼─────────────────────────────────┤
    │ queue_name │ coop_q │
    │ consumer_name │ test_consumer.test_subconsumer2 │
    │ lag │ NULL
    │ last_seen │ 00:00:54.789057
    │ last_tick │ NULL
    │ current_batch │ NULL
    │ next_tick │ NULL
    │ pending_events │ NULL
    └────────────────┴─────────────────────────────────┘
  6. Пример автоматизированного конвейра получения и обработки сеансов:

    SET ROLE tuz_pgq_reader;
    DO
    $$
    DECLARE
    v_batch_id int8;
    v_event_cnt int8;
    BEGIN
    -- subconsumer 1
    SELECT next_batch FROM pgq_coop.next_batch('coop_q','test_consumer','test_subconsumer1') INTO v_batch_id;
    IF v_batch_id ISNULLNOTNULL THEN
    RETURN;
    END IF;
    SELECT count(*) FROM pgq.get_batch_events(v_batch_id) INTO v_event_cnt;
    RAISE NOTICE 'Got % events from batch for subconsumer1 %', v_event_cnt::text,v_batch_id::TEXT;
    PERFORM pgq_coop.finish_batch(v_batch_id);
    END IF;
    --subconsumer 2
    SELECT next_batch FROM pgq_coop.next_batch('coop_q','test_consumer','test_subconsumer2') INTO v_batch_id;
    IF v_batch_id ISNULLNOTNULL THEN
    RETURN;
    END IF;
    SELECT count(*) FROM pgq.get_batch_events(v_batch_id) INTO v_event_cnt;
    RAISE NOTICE 'Got % events from batch for subconsumer2 %', v_event_cnt::text,v_batch_id::TEXT;
    PERFORM pgq_coop.finish_batch(v_batch_id);
    END IF;
    END;
    $$;
    \watch 1

    При успешном результате наблюдается последовательное увеличение batch_id и периодическое получение сообщений до исчерпания очереди.

  7. Получите статистическую информацию по очереди и потребителям:

    SET ROLE tuz_pgq_reader;
    SELECT * FROM pgq.get_queue_info()\gx
    SELECT * FROM pgq.get_consumer_info()\gx

    Результат выполнения команды — получение таблиц вида:

    SELECT * FROM pgq.get_queue_info()\gx
    ┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
    │ queue_name │ coop_q │
    │ queue_ntables │ 3
    │ queue_cur_table │ 0
    │ queue_rotation_period │ 02:00:00
    │ queue_switch_time │ 2023-11-15 14:18:03.430572+03
    │ queue_external_ticker │ f │
    │ queue_ticker_paused │ f │
    │ queue_ticker_max_count │ 500
    │ queue_ticker_max_lag │ 00:00:03
    │ queue_ticker_idle_period │ 00:01:00
    │ ticker_lag │ 00:00:58.33156
    │ ev_per_sec │ 11.074713891632932
    │ ev_new │ 0
    │ last_tick_id │ 49
    └──────────────────────────┴───────────────────────────────┘
    SELECT * FROM pgq.get_consumer_info()\gx
    ┌─[ RECORD 1 ]───┬─────────────────────────────────┐
    │ queue_name │ coop_q │
    │ consumer_name │ test_consumer │
    │ lag │ 00:04:46.626124
    │ last_seen │ 00:00:51.636231
    │ last_tick │ 46
    │ current_batch │ NULL
    │ next_tick │ NULL
    │ pending_events │ 0
    ├─[ RECORD 2 ]───┼─────────────────────────────────┤
    │ queue_name │ coop_q │
    │ consumer_name │ test_consumer.test_subconsumer1 │
    │ lag │ NULL
    │ last_seen │ 00:00:51.636231
    │ last_tick │ NULL
    │ current_batch │ NULL
    │ next_tick │ NULL
    │ pending_events │ NULL
    ├─[ RECORD 3 ]───┼─────────────────────────────────┤
    │ queue_name │ coop_q │
    │ consumer_name │ test_consumer.test_subconsumer2 │
    │ lag │ NULL
    │ last_seen │ 00:00:51.636231
    │ last_tick │ NULL
    │ current_batch │ NULL
    │ next_tick │ NULL
    │ pending_events │ NULL
    └────────────────┴─────────────────────────────────┘
  8. Удалите очередь:

    SET ROLE tuz_pgq_reader;
    SELECT * FROM pgq.drop_queue(x_queue_name =>'coop_q', x_force => TRUE);
    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.drop_queue(x_queue_name =>'coop_q', x_force => TRUE);

    Результат выполнения команды — очередь удалена.

    Примечание

    Прав tuz_pgq_reader недостаточно для удаления очереди. Прав tuz_pgq_admin достаточно для удаления очереди.

Ссылки на документацию разработчика

Функции расширения pgq_coop: https://github.com/pgq/pgq-coop.