Buscar..


Observaciones

  • Nunca use DDL o DML contra tablas creadas por dbms_aqadm.create_queue_table . Utilice solo dbms_aqadm y dbms_aq para trabajar con estas tablas. Oracle puede crear varias tablas de soporte, índices, etc. de los que no tendrá conocimiento. La ejecución manual de DDL o DML en la tabla puede llevarlo a un escenario en el que el soporte de Oracle necesitará que suelte y vuelva a crear la tabla y las colas para resolver la situación.

  • Se recomienda encarecidamente que no utilice dbms_aq.forever para una opción de espera. Esto ha provocado problemas en el pasado, ya que Oracle puede comenzar a programar un número excesivo de trabajos de los trabajadores para que funcionen en las colas que no son necesarias (consulte Oracle Doc ID 2001165.1).

  • Se recomienda que no configure el parámetro AQ_TM_PROCESSES en la versión 10.1 y posteriores. Evite especialmente establecer esto en cero ya que esto deshabilitará el trabajo de fondo QMON que es necesario para mantener las colas. Puede restablecer este valor al valor predeterminado de Oracle utilizando el siguiente comando y reiniciando la base de datos. alter system reset aq_tm_processes scope=spfile sid='*';

Productor / Consumidor Simple

Visión general

Crea una cola a la que podamos enviar un mensaje. Oracle notificará a nuestro procedimiento almacenado que un mensaje ha sido puesto en cola y debe ser trabajado. También agregaremos algunos subprogramas que podemos usar en una emergencia para evitar que los mensajes se desactualicen, permitir que se vuelva a poner en cola y ejecutar un trabajo por lotes simple para trabajar con todos los mensajes.

Estos ejemplos se probaron en Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production.

Crear cola

Crearemos un tipo de mensaje, una tabla de cola que puede contener los mensajes y una cola. Los mensajes en la cola se eliminarán en primer lugar por prioridad y luego su tiempo de salida. Si algo sale mal al trabajar el mensaje y la salida de la cola se retrotrae, AQ hará que el mensaje esté disponible para la salida 3600 segundos después. Lo hará 48 veces antes de moverlo a una cola de excepciones.

create type message_t as object 
   (
   sender  varchar2 ( 50 ),
   message varchar2 ( 512 )
   );
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
     queue_table        => 'MESSAGE_Q_TBL',
     queue_payload_type => 'MESSAGE_T',
     sort_list          => 'PRIORITY,ENQ_TIME',
     multiple_consumers =>  false,
     compatible         => '10.0.0');
  end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
     queue_name          => 'MESSAGE_Q',
     queue_table         => 'MESSAGE_Q_TBL',
     queue_type          =>  0,
     max_retries         =>  48,
     retry_delay         =>  3600,
     dependency_tracking =>  false);
  end;
/
-- PL/SQL procedure successfully completed.

Ahora que tenemos un lugar para colocar los mensajes, podemos crear un paquete para administrar y trabajar los mensajes en la cola.

create or replace package message_worker_pkg
is
   queue_name_c constant varchar2(20) := 'MESSAGE_Q';
   
   -- allows the workers to process messages in the queue
   procedure enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.
   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number);

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages;
   
end;
/

create or replace package body message_worker_pkg
is
   -- raised by Oracle when we try to dequeue but no more messages are ready to
   -- be dequeued at this moment
   no_more_messages_ex          exception;
   pragma exception_init (no_more_messages_ex,
                          -25228);

   -- allows the workers to process messages in the queue
   procedure enable_dequeue
   as
   begin
      dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
   end enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue
   as
   begin
      dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
   end disable_dequeue;

   procedure work_message (message_in in out nocopy message_t)
   as
   begin
      dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
   end work_message;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.

   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number)
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.msgid         := descr.msg_id;
      dequeue_options_l.consumer_name := descr.consumer_name;
      dequeue_options_l.wait          := dbms_aq.no_wait;
      dbms_aq.dequeue (queue_name           => descr.queue_name,
                       dequeue_options      => dequeue_options_l,
                       message_properties   => message_properties_l,
                       payload              => message_l,
                       msgid                => message_id_l);
      work_message (message_l);
      commit;
   exception
      when no_more_messages_ex
      then
         -- it's possible work_old_messages already dequeued the message
         commit;
      when others
      then
         -- we don't need to have a raise here.  I just wanted to point out that
         -- since this will be called by AQ throwing the exception back to it
         -- will have it put the message back on the queue and retry later
         raise;
   end on_message_enqueued;

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.wait       := dbms_aq.no_wait;
      dequeue_options_l.navigation := dbms_aq.first_message;

      while (true) loop -- way out is no_more_messages_ex
         dbms_aq.dequeue (queue_name           => queue_name_c,
                          dequeue_options      => dequeue_options_l,
                          message_properties   => message_properties_l,
                          payload              => message_l,
                          msgid                => message_id_l);
         work_message (message_l);
         commit;
      end loop;
   exception
      when no_more_messages_ex
      then
         null;
   end work_old_messages;
end;

A continuación, informe a AQ que cuando se envía un mensaje a MESSAGE_Q (y se confirma), notifique a nuestro procedimiento que tiene trabajo que hacer. AQ iniciará un trabajo en su propia sesión para manejar esto.

begin
  dbms_aq.register (
     sys.aq$_reg_info_list (
        sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
                          dbms_aq.namespace_aq,
                          'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
                          hextoraw ('FF'))),
     1);
  commit;
end; 

Iniciar cola y enviar un mensaje

declare
   enqueue_options_l      dbms_aq.enqueue_options_t;
   message_properties_l   dbms_aq.message_properties_t;
   message_id_l           raw (16);
   message_l              message_t;
begin
   -- only need to do this next line ONCE
   dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
   
   message_l := new message_t ( 'Jon', 'Hello, world!' );
   dbms_aq.enqueue (queue_name           => message_worker_pkg.queue_name_c,
                    enqueue_options      => enqueue_options_l,
                    message_properties   => message_properties_l,
                    payload              => message_l,
                    msgid                => message_id_l);
   commit;
end;


Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow