Recherche…


Remarques

  • N'utilisez jamais DDL ou DML sur des tables créées par dbms_aqadm.create_queue_table . Utilisez uniquement dbms_aqadm et dbms_aq pour travailler avec ces tables. Oracle peut créer plusieurs tables, index, etc. que vous ne connaissez pas. L'exécution manuelle de DDL ou DML sur la table peut vous conduire à un scénario dans lequel le support Oracle aura besoin de supprimer et de recréer la table et les files d'attente pour résoudre le problème.

  • Il est fortement recommandé de ne pas utiliser dbms_aq.forever pour une option d’attente. Cela a causé des problèmes dans le passé, car Oracle peut commencer à planifier un nombre excessif de tâches de travail pour traiter les files d'attente inutiles (voir Oracle Doc ID 2001165.1).

  • Il est recommandé de ne pas définir le paramètre AQ_TM_PROCESSES dans la version 10.1 ou ultérieure. Surtout, évitez de définir cette valeur sur zéro, car cela désactivera le travail en arrière-plan QMON nécessaire pour gérer les files d'attente. Vous pouvez réinitialiser cette valeur à la valeur par défaut Oracle à l'aide de la commande suivante et en redémarrant la base de données. alter system reset aq_tm_processes scope=spfile sid='*';

Simple producteur / consommateur

Vue d'ensemble

Créez une file d'attente à laquelle vous pouvez envoyer un message. Oracle notifiera à notre procédure stockée qu'un message a été mis en file d'attente et doit être traité. Nous allons également ajouter des sous-programmes que nous pouvons utiliser en cas d'urgence pour empêcher le déqeuing des messages, autoriser une nouvelle mise en file d'attente et exécuter un simple traitement par lots pour traiter tous les messages.

Ces exemples ont été testés sur Oracle Database 12c Enterprise Edition version 12.1.0.2.0 - Production 64 bits.

Créer une file d'attente

Nous allons créer un type de message, une table de files d'attente pouvant contenir les messages et une file d'attente. Les messages dans la file d'attente seront désaffectés en priorité, puis leur heure de mise en file d'attente. Si quelque chose se passe mal en travaillant le message et que la dequeue est annulée, AQ mettra le message à la disposition de la file d'attente 3600 secondes plus tard. Il le fera 48 fois avant de le déplacer dans une file d’exceptions.

create type message_t as object 
   (
   sender  varchar2 ( 50 ),
   message varchar2 ( 512 )
   );
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
     queue_table        => 'MESSAGE_Q_TBL',
     queue_payload_type => 'MESSAGE_T',
     sort_list          => 'PRIORITY,ENQ_TIME',
     multiple_consumers =>  false,
     compatible         => '10.0.0');
  end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
     queue_name          => 'MESSAGE_Q',
     queue_table         => 'MESSAGE_Q_TBL',
     queue_type          =>  0,
     max_retries         =>  48,
     retry_delay         =>  3600,
     dependency_tracking =>  false);
  end;
/
-- PL/SQL procedure successfully completed.

Maintenant que nous avons un endroit pour mettre les messages, créons un package pour gérer et travailler les messages dans la file d'attente.

create or replace package message_worker_pkg
is
   queue_name_c constant varchar2(20) := 'MESSAGE_Q';
   
   -- allows the workers to process messages in the queue
   procedure enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.
   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number);

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages;
   
end;
/

create or replace package body message_worker_pkg
is
   -- raised by Oracle when we try to dequeue but no more messages are ready to
   -- be dequeued at this moment
   no_more_messages_ex          exception;
   pragma exception_init (no_more_messages_ex,
                          -25228);

   -- allows the workers to process messages in the queue
   procedure enable_dequeue
   as
   begin
      dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
   end enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue
   as
   begin
      dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
   end disable_dequeue;

   procedure work_message (message_in in out nocopy message_t)
   as
   begin
      dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
   end work_message;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.

   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number)
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.msgid         := descr.msg_id;
      dequeue_options_l.consumer_name := descr.consumer_name;
      dequeue_options_l.wait          := dbms_aq.no_wait;
      dbms_aq.dequeue (queue_name           => descr.queue_name,
                       dequeue_options      => dequeue_options_l,
                       message_properties   => message_properties_l,
                       payload              => message_l,
                       msgid                => message_id_l);
      work_message (message_l);
      commit;
   exception
      when no_more_messages_ex
      then
         -- it's possible work_old_messages already dequeued the message
         commit;
      when others
      then
         -- we don't need to have a raise here.  I just wanted to point out that
         -- since this will be called by AQ throwing the exception back to it
         -- will have it put the message back on the queue and retry later
         raise;
   end on_message_enqueued;

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.wait       := dbms_aq.no_wait;
      dequeue_options_l.navigation := dbms_aq.first_message;

      while (true) loop -- way out is no_more_messages_ex
         dbms_aq.dequeue (queue_name           => queue_name_c,
                          dequeue_options      => dequeue_options_l,
                          message_properties   => message_properties_l,
                          payload              => message_l,
                          msgid                => message_id_l);
         work_message (message_l);
         commit;
      end loop;
   exception
      when no_more_messages_ex
      then
         null;
   end work_old_messages;
end;

Ensuite, indiquez à AQ que lorsqu'un message est mis en file d'attente sur MESSAGE_Q (et qu'il est validé), notifiez notre procédure. AQ va commencer un travail dans sa propre session pour gérer cela.

begin
  dbms_aq.register (
     sys.aq$_reg_info_list (
        sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
                          dbms_aq.namespace_aq,
                          'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
                          hextoraw ('FF'))),
     1);
  commit;
end; 

Démarrer une file d'attente et envoyer un message

declare
   enqueue_options_l      dbms_aq.enqueue_options_t;
   message_properties_l   dbms_aq.message_properties_t;
   message_id_l           raw (16);
   message_l              message_t;
begin
   -- only need to do this next line ONCE
   dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
   
   message_l := new message_t ( 'Jon', 'Hello, world!' );
   dbms_aq.enqueue (queue_name           => message_worker_pkg.queue_name_c,
                    enqueue_options      => enqueue_options_l,
                    message_properties   => message_properties_l,
                    payload              => message_l,
                    msgid                => message_id_l);
   commit;
end;


Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow