Oracle Database
Oracle Advanced Queuing (AQ)
Buscar..
Observaciones
Nunca use DDL o DML contra tablas creadas por
dbms_aqadm.create_queue_table
. Utilice solo dbms_aqadm y dbms_aq para trabajar con estas tablas. Oracle puede crear varias tablas de soporte, índices, etc. de los que no tendrá conocimiento. La ejecución manual de DDL o DML en la tabla puede llevarlo a un escenario en el que el soporte de Oracle necesitará que suelte y vuelva a crear la tabla y las colas para resolver la situación.Se recomienda encarecidamente que no utilice
dbms_aq.forever
para una opción de espera. Esto ha provocado problemas en el pasado, ya que Oracle puede comenzar a programar un número excesivo de trabajos de los trabajadores para que funcionen en las colas que no son necesarias (consulte Oracle Doc ID 2001165.1).Se recomienda que no configure el parámetro AQ_TM_PROCESSES en la versión 10.1 y posteriores. Evite especialmente establecer esto en cero ya que esto deshabilitará el trabajo de fondo QMON que es necesario para mantener las colas. Puede restablecer este valor al valor predeterminado de Oracle utilizando el siguiente comando y reiniciando la base de datos.
alter system reset aq_tm_processes scope=spfile sid='*';
Productor / Consumidor Simple
Visión general
Crea una cola a la que podamos enviar un mensaje. Oracle notificará a nuestro procedimiento almacenado que un mensaje ha sido puesto en cola y debe ser trabajado. También agregaremos algunos subprogramas que podemos usar en una emergencia para evitar que los mensajes se desactualicen, permitir que se vuelva a poner en cola y ejecutar un trabajo por lotes simple para trabajar con todos los mensajes.
Estos ejemplos se probaron en Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production.
Crear cola
Crearemos un tipo de mensaje, una tabla de cola que puede contener los mensajes y una cola. Los mensajes en la cola se eliminarán en primer lugar por prioridad y luego su tiempo de salida. Si algo sale mal al trabajar el mensaje y la salida de la cola se retrotrae, AQ hará que el mensaje esté disponible para la salida 3600 segundos después. Lo hará 48 veces antes de moverlo a una cola de excepciones.
create type message_t as object
(
sender varchar2 ( 50 ),
message varchar2 ( 512 )
);
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
queue_table => 'MESSAGE_Q_TBL',
queue_payload_type => 'MESSAGE_T',
sort_list => 'PRIORITY,ENQ_TIME',
multiple_consumers => false,
compatible => '10.0.0');
end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
queue_name => 'MESSAGE_Q',
queue_table => 'MESSAGE_Q_TBL',
queue_type => 0,
max_retries => 48,
retry_delay => 3600,
dependency_tracking => false);
end;
/
-- PL/SQL procedure successfully completed.
Ahora que tenemos un lugar para colocar los mensajes, podemos crear un paquete para administrar y trabajar los mensajes en la cola.
create or replace package message_worker_pkg
is
queue_name_c constant varchar2(20) := 'MESSAGE_Q';
-- allows the workers to process messages in the queue
procedure enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number);
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages;
end;
/
create or replace package body message_worker_pkg
is
-- raised by Oracle when we try to dequeue but no more messages are ready to
-- be dequeued at this moment
no_more_messages_ex exception;
pragma exception_init (no_more_messages_ex,
-25228);
-- allows the workers to process messages in the queue
procedure enable_dequeue
as
begin
dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
end enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue
as
begin
dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
end disable_dequeue;
procedure work_message (message_in in out nocopy message_t)
as
begin
dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
end work_message;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number)
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.msgid := descr.msg_id;
dequeue_options_l.consumer_name := descr.consumer_name;
dequeue_options_l.wait := dbms_aq.no_wait;
dbms_aq.dequeue (queue_name => descr.queue_name,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
exception
when no_more_messages_ex
then
-- it's possible work_old_messages already dequeued the message
commit;
when others
then
-- we don't need to have a raise here. I just wanted to point out that
-- since this will be called by AQ throwing the exception back to it
-- will have it put the message back on the queue and retry later
raise;
end on_message_enqueued;
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.wait := dbms_aq.no_wait;
dequeue_options_l.navigation := dbms_aq.first_message;
while (true) loop -- way out is no_more_messages_ex
dbms_aq.dequeue (queue_name => queue_name_c,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
end loop;
exception
when no_more_messages_ex
then
null;
end work_old_messages;
end;
A continuación, informe a AQ que cuando se envía un mensaje a MESSAGE_Q (y se confirma), notifique a nuestro procedimiento que tiene trabajo que hacer. AQ iniciará un trabajo en su propia sesión para manejar esto.
begin
dbms_aq.register (
sys.aq$_reg_info_list (
sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
dbms_aq.namespace_aq,
'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
hextoraw ('FF'))),
1);
commit;
end;
Iniciar cola y enviar un mensaje
declare
enqueue_options_l dbms_aq.enqueue_options_t;
message_properties_l dbms_aq.message_properties_t;
message_id_l raw (16);
message_l message_t;
begin
-- only need to do this next line ONCE
dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
message_l := new message_t ( 'Jon', 'Hello, world!' );
dbms_aq.enqueue (queue_name => message_worker_pkg.queue_name_c,
enqueue_options => enqueue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
commit;
end;