Oracle Database
Oracle Advanced Queuing (AQ)
Поиск…
замечания
Никогда не используйте DDL или DML для таблиц, созданных
dbms_aqadm.create_queue_table
. Используйте только dbms_aqadm и dbms_aq для работы с этими таблицами. Oracle может создавать несколько поддерживающих таблиц, индексов и т. Д., О которых вы не будете знать. Ручное управление DDL или DML с таблицей может привести к сценарию, в котором Oracle Support вам понадобится, чтобы удалить и воссоздать таблицу и очереди, чтобы решить эту проблему.Настоятельно рекомендуется не использовать параметр
dbms_aq.forever
для ожидания. Это вызвало проблемы в прошлом, поскольку Oracle может начать планировать чрезмерное количество рабочих заданий для работы ненужных очередей (см. Oracle Doc ID 2001165.1).Рекомендуется не устанавливать параметр AQ_TM_PROCESSES в версии 10.1 и новее. Особенно избегайте установки этого значения в ноль, поскольку это отключит фоновое задание QMON, которое необходимо для поддержания очередей. Вы можете сбросить это значение до значения по умолчанию Oracle, используя следующую команду и перезагрузив базу данных.
alter system reset aq_tm_processes scope=spfile sid='*';
Простой производитель / потребитель
обзор
Создайте очередь, на которую мы можем отправить сообщение. Oracle уведомит нашу хранимую процедуру о том, что сообщение было помещено в очередь и должно быть обработано. Мы также добавим некоторые подпрограммы, которые мы можем использовать в экстренной ситуации, чтобы не допустить, чтобы сообщения были отклонены, снова разрешить переопределение и запустить простое пакетное задание для работы через все сообщения.
Эти примеры были протестированы на Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production.
Создать очередь
Мы создадим тип сообщения, таблицу очередей, в которой могут храниться сообщения, и очереди. Сообщения в очереди будут сначала дезертированы по приоритету, а затем их время очереди. Если что-то пойдет не так, работая над сообщением, а dequeue будет откат, AQ сделает сообщение доступным для dequeue 3600 секунд спустя. Он будет делать это 48 раз, прежде чем перемещать его в очередь исключений.
create type message_t as object
(
sender varchar2 ( 50 ),
message varchar2 ( 512 )
);
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
queue_table => 'MESSAGE_Q_TBL',
queue_payload_type => 'MESSAGE_T',
sort_list => 'PRIORITY,ENQ_TIME',
multiple_consumers => false,
compatible => '10.0.0');
end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
queue_name => 'MESSAGE_Q',
queue_table => 'MESSAGE_Q_TBL',
queue_type => 0,
max_retries => 48,
retry_delay => 3600,
dependency_tracking => false);
end;
/
-- PL/SQL procedure successfully completed.
Теперь, когда у нас есть место для размещения сообщений, вы можете создать пакет для управления и работы сообщений в очереди.
create or replace package message_worker_pkg
is
queue_name_c constant varchar2(20) := 'MESSAGE_Q';
-- allows the workers to process messages in the queue
procedure enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number);
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages;
end;
/
create or replace package body message_worker_pkg
is
-- raised by Oracle when we try to dequeue but no more messages are ready to
-- be dequeued at this moment
no_more_messages_ex exception;
pragma exception_init (no_more_messages_ex,
-25228);
-- allows the workers to process messages in the queue
procedure enable_dequeue
as
begin
dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
end enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue
as
begin
dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
end disable_dequeue;
procedure work_message (message_in in out nocopy message_t)
as
begin
dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
end work_message;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number)
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.msgid := descr.msg_id;
dequeue_options_l.consumer_name := descr.consumer_name;
dequeue_options_l.wait := dbms_aq.no_wait;
dbms_aq.dequeue (queue_name => descr.queue_name,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
exception
when no_more_messages_ex
then
-- it's possible work_old_messages already dequeued the message
commit;
when others
then
-- we don't need to have a raise here. I just wanted to point out that
-- since this will be called by AQ throwing the exception back to it
-- will have it put the message back on the queue and retry later
raise;
end on_message_enqueued;
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.wait := dbms_aq.no_wait;
dequeue_options_l.navigation := dbms_aq.first_message;
while (true) loop -- way out is no_more_messages_ex
dbms_aq.dequeue (queue_name => queue_name_c,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
end loop;
exception
when no_more_messages_ex
then
null;
end work_old_messages;
end;
Затем скажите AQ, что, когда сообщение помещено в MESSAGE_Q (и зафиксировано), уведомление о нашей процедуре должно быть выполнено. AQ запустит работу на своем собственном сеансе, чтобы справиться с этим.
begin
dbms_aq.register (
sys.aq$_reg_info_list (
sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
dbms_aq.namespace_aq,
'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
hextoraw ('FF'))),
1);
commit;
end;
Запустить очередь и отправить сообщение
declare
enqueue_options_l dbms_aq.enqueue_options_t;
message_properties_l dbms_aq.message_properties_t;
message_id_l raw (16);
message_l message_t;
begin
-- only need to do this next line ONCE
dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
message_l := new message_t ( 'Jon', 'Hello, world!' );
dbms_aq.enqueue (queue_name => message_worker_pkg.queue_name_c,
enqueue_options => enqueue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
commit;
end;