Поиск…


замечания

  • Никогда не используйте DDL или DML для таблиц, созданных dbms_aqadm.create_queue_table . Используйте только dbms_aqadm и dbms_aq для работы с этими таблицами. Oracle может создавать несколько поддерживающих таблиц, индексов и т. Д., О которых вы не будете знать. Ручное управление DDL или DML с таблицей может привести к сценарию, в котором Oracle Support вам понадобится, чтобы удалить и воссоздать таблицу и очереди, чтобы решить эту проблему.

  • Настоятельно рекомендуется не использовать параметр dbms_aq.forever для ожидания. Это вызвало проблемы в прошлом, поскольку Oracle может начать планировать чрезмерное количество рабочих заданий для работы ненужных очередей (см. Oracle Doc ID 2001165.1).

  • Рекомендуется не устанавливать параметр AQ_TM_PROCESSES в версии 10.1 и новее. Особенно избегайте установки этого значения в ноль, поскольку это отключит фоновое задание QMON, которое необходимо для поддержания очередей. Вы можете сбросить это значение до значения по умолчанию Oracle, используя следующую команду и перезагрузив базу данных. alter system reset aq_tm_processes scope=spfile sid='*';

Простой производитель / потребитель

обзор

Создайте очередь, на которую мы можем отправить сообщение. Oracle уведомит нашу хранимую процедуру о том, что сообщение было помещено в очередь и должно быть обработано. Мы также добавим некоторые подпрограммы, которые мы можем использовать в экстренной ситуации, чтобы не допустить, чтобы сообщения были отклонены, снова разрешить переопределение и запустить простое пакетное задание для работы через все сообщения.

Эти примеры были протестированы на Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production.

Создать очередь

Мы создадим тип сообщения, таблицу очередей, в которой могут храниться сообщения, и очереди. Сообщения в очереди будут сначала дезертированы по приоритету, а затем их время очереди. Если что-то пойдет не так, работая над сообщением, а dequeue будет откат, AQ сделает сообщение доступным для dequeue 3600 секунд спустя. Он будет делать это 48 раз, прежде чем перемещать его в очередь исключений.

create type message_t as object 
   (
   sender  varchar2 ( 50 ),
   message varchar2 ( 512 )
   );
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
     queue_table        => 'MESSAGE_Q_TBL',
     queue_payload_type => 'MESSAGE_T',
     sort_list          => 'PRIORITY,ENQ_TIME',
     multiple_consumers =>  false,
     compatible         => '10.0.0');
  end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
     queue_name          => 'MESSAGE_Q',
     queue_table         => 'MESSAGE_Q_TBL',
     queue_type          =>  0,
     max_retries         =>  48,
     retry_delay         =>  3600,
     dependency_tracking =>  false);
  end;
/
-- PL/SQL procedure successfully completed.

Теперь, когда у нас есть место для размещения сообщений, вы можете создать пакет для управления и работы сообщений в очереди.

create or replace package message_worker_pkg
is
   queue_name_c constant varchar2(20) := 'MESSAGE_Q';
   
   -- allows the workers to process messages in the queue
   procedure enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.
   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number);

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages;
   
end;
/

create or replace package body message_worker_pkg
is
   -- raised by Oracle when we try to dequeue but no more messages are ready to
   -- be dequeued at this moment
   no_more_messages_ex          exception;
   pragma exception_init (no_more_messages_ex,
                          -25228);

   -- allows the workers to process messages in the queue
   procedure enable_dequeue
   as
   begin
      dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
   end enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue
   as
   begin
      dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
   end disable_dequeue;

   procedure work_message (message_in in out nocopy message_t)
   as
   begin
      dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
   end work_message;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.

   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number)
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.msgid         := descr.msg_id;
      dequeue_options_l.consumer_name := descr.consumer_name;
      dequeue_options_l.wait          := dbms_aq.no_wait;
      dbms_aq.dequeue (queue_name           => descr.queue_name,
                       dequeue_options      => dequeue_options_l,
                       message_properties   => message_properties_l,
                       payload              => message_l,
                       msgid                => message_id_l);
      work_message (message_l);
      commit;
   exception
      when no_more_messages_ex
      then
         -- it's possible work_old_messages already dequeued the message
         commit;
      when others
      then
         -- we don't need to have a raise here.  I just wanted to point out that
         -- since this will be called by AQ throwing the exception back to it
         -- will have it put the message back on the queue and retry later
         raise;
   end on_message_enqueued;

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.wait       := dbms_aq.no_wait;
      dequeue_options_l.navigation := dbms_aq.first_message;

      while (true) loop -- way out is no_more_messages_ex
         dbms_aq.dequeue (queue_name           => queue_name_c,
                          dequeue_options      => dequeue_options_l,
                          message_properties   => message_properties_l,
                          payload              => message_l,
                          msgid                => message_id_l);
         work_message (message_l);
         commit;
      end loop;
   exception
      when no_more_messages_ex
      then
         null;
   end work_old_messages;
end;

Затем скажите AQ, что, когда сообщение помещено в MESSAGE_Q (и зафиксировано), уведомление о нашей процедуре должно быть выполнено. AQ запустит работу на своем собственном сеансе, чтобы справиться с этим.

begin
  dbms_aq.register (
     sys.aq$_reg_info_list (
        sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
                          dbms_aq.namespace_aq,
                          'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
                          hextoraw ('FF'))),
     1);
  commit;
end; 

Запустить очередь и отправить сообщение

declare
   enqueue_options_l      dbms_aq.enqueue_options_t;
   message_properties_l   dbms_aq.message_properties_t;
   message_id_l           raw (16);
   message_l              message_t;
begin
   -- only need to do this next line ONCE
   dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
   
   message_l := new message_t ( 'Jon', 'Hello, world!' );
   dbms_aq.enqueue (queue_name           => message_worker_pkg.queue_name_c,
                    enqueue_options      => enqueue_options_l,
                    message_properties   => message_properties_l,
                    payload              => message_l,
                    msgid                => message_id_l);
   commit;
end;


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow