Oracle Database
Oracle Advanced Queuing (AQ)
Recherche…
Remarques
N'utilisez jamais DDL ou DML sur des tables créées par
dbms_aqadm.create_queue_table
. Utilisez uniquement dbms_aqadm et dbms_aq pour travailler avec ces tables. Oracle peut créer plusieurs tables, index, etc. que vous ne connaissez pas. L'exécution manuelle de DDL ou DML sur la table peut vous conduire à un scénario dans lequel le support Oracle aura besoin de supprimer et de recréer la table et les files d'attente pour résoudre le problème.Il est fortement recommandé de ne pas utiliser
dbms_aq.forever
pour une option d’attente. Cela a causé des problèmes dans le passé, car Oracle peut commencer à planifier un nombre excessif de tâches de travail pour traiter les files d'attente inutiles (voir Oracle Doc ID 2001165.1).Il est recommandé de ne pas définir le paramètre AQ_TM_PROCESSES dans la version 10.1 ou ultérieure. Surtout, évitez de définir cette valeur sur zéro, car cela désactivera le travail en arrière-plan QMON nécessaire pour gérer les files d'attente. Vous pouvez réinitialiser cette valeur à la valeur par défaut Oracle à l'aide de la commande suivante et en redémarrant la base de données.
alter system reset aq_tm_processes scope=spfile sid='*';
Simple producteur / consommateur
Vue d'ensemble
Créez une file d'attente à laquelle vous pouvez envoyer un message. Oracle notifiera à notre procédure stockée qu'un message a été mis en file d'attente et doit être traité. Nous allons également ajouter des sous-programmes que nous pouvons utiliser en cas d'urgence pour empêcher le déqeuing des messages, autoriser une nouvelle mise en file d'attente et exécuter un simple traitement par lots pour traiter tous les messages.
Ces exemples ont été testés sur Oracle Database 12c Enterprise Edition version 12.1.0.2.0 - Production 64 bits.
Créer une file d'attente
Nous allons créer un type de message, une table de files d'attente pouvant contenir les messages et une file d'attente. Les messages dans la file d'attente seront désaffectés en priorité, puis leur heure de mise en file d'attente. Si quelque chose se passe mal en travaillant le message et que la dequeue est annulée, AQ mettra le message à la disposition de la file d'attente 3600 secondes plus tard. Il le fera 48 fois avant de le déplacer dans une file d’exceptions.
create type message_t as object
(
sender varchar2 ( 50 ),
message varchar2 ( 512 )
);
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
queue_table => 'MESSAGE_Q_TBL',
queue_payload_type => 'MESSAGE_T',
sort_list => 'PRIORITY,ENQ_TIME',
multiple_consumers => false,
compatible => '10.0.0');
end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
queue_name => 'MESSAGE_Q',
queue_table => 'MESSAGE_Q_TBL',
queue_type => 0,
max_retries => 48,
retry_delay => 3600,
dependency_tracking => false);
end;
/
-- PL/SQL procedure successfully completed.
Maintenant que nous avons un endroit pour mettre les messages, créons un package pour gérer et travailler les messages dans la file d'attente.
create or replace package message_worker_pkg
is
queue_name_c constant varchar2(20) := 'MESSAGE_Q';
-- allows the workers to process messages in the queue
procedure enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number);
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages;
end;
/
create or replace package body message_worker_pkg
is
-- raised by Oracle when we try to dequeue but no more messages are ready to
-- be dequeued at this moment
no_more_messages_ex exception;
pragma exception_init (no_more_messages_ex,
-25228);
-- allows the workers to process messages in the queue
procedure enable_dequeue
as
begin
dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
end enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue
as
begin
dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
end disable_dequeue;
procedure work_message (message_in in out nocopy message_t)
as
begin
dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
end work_message;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number)
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.msgid := descr.msg_id;
dequeue_options_l.consumer_name := descr.consumer_name;
dequeue_options_l.wait := dbms_aq.no_wait;
dbms_aq.dequeue (queue_name => descr.queue_name,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
exception
when no_more_messages_ex
then
-- it's possible work_old_messages already dequeued the message
commit;
when others
then
-- we don't need to have a raise here. I just wanted to point out that
-- since this will be called by AQ throwing the exception back to it
-- will have it put the message back on the queue and retry later
raise;
end on_message_enqueued;
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.wait := dbms_aq.no_wait;
dequeue_options_l.navigation := dbms_aq.first_message;
while (true) loop -- way out is no_more_messages_ex
dbms_aq.dequeue (queue_name => queue_name_c,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
end loop;
exception
when no_more_messages_ex
then
null;
end work_old_messages;
end;
Ensuite, indiquez à AQ que lorsqu'un message est mis en file d'attente sur MESSAGE_Q (et qu'il est validé), notifiez notre procédure. AQ va commencer un travail dans sa propre session pour gérer cela.
begin
dbms_aq.register (
sys.aq$_reg_info_list (
sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
dbms_aq.namespace_aq,
'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
hextoraw ('FF'))),
1);
commit;
end;
Démarrer une file d'attente et envoyer un message
declare
enqueue_options_l dbms_aq.enqueue_options_t;
message_properties_l dbms_aq.message_properties_t;
message_id_l raw (16);
message_l message_t;
begin
-- only need to do this next line ONCE
dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
message_l := new message_t ( 'Jon', 'Hello, world!' );
dbms_aq.enqueue (queue_name => message_worker_pkg.queue_name_c,
enqueue_options => enqueue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
commit;
end;