Zoeken…


Opmerkingen

  • Gebruik nooit DDL of DML tegen tabellen die zijn gemaakt door dbms_aqadm.create_queue_table . Gebruik alleen dbms_aqadm en dbms_aq om met deze tabellen te werken. Oracle kan verschillende ondersteunende tabellen, indexen, enz. Maken waarvan u niet op de hoogte bent. Handmatig uitvoeren van DDL of DML tegen de tafel kan leiden tot een scenario waarin Oracle Support u moet laten vallen en de tabel en wachtrijen opnieuw moet maken om de situatie op te lossen.

  • Het wordt ten zeerste aanbevolen om dbms_aq.forever niet te gebruiken voor een dbms_aq.forever . Dit heeft in het verleden problemen veroorzaakt omdat Oracle mogelijk een overmatig aantal werkopdrachten kan plannen om onnodige wachtrijen te verwerken (zie Oracle Doc ID 2001165.1).

  • Het wordt aanbevolen dat u de parameter AQ_TM_PROCESSES niet instelt in versie 10.1 en hoger. Stel dit met name niet in op nul, omdat hierdoor de QMON-achtergrondtaak wordt uitgeschakeld die nodig is om de wachtrijen bij te houden. U kunt deze waarde opnieuw instellen op de standaard Oracle met de volgende opdracht en de database opnieuw opstarten. alter system reset aq_tm_processes scope=spfile sid='*';

Eenvoudige producent / consument

Overzicht

Maak een wachtrij waarnaar we een bericht kunnen sturen. Oracle zal onze opgeslagen procedure melden dat een bericht is vrijgegeven en moet worden verwerkt. We zullen ook enkele subprogramma's toevoegen die we in een noodgeval kunnen gebruiken om te voorkomen dat berichten deqeued worden, opnieuw wachtrijen toestaan en een eenvoudige batchopdracht uitvoeren om alle berichten te verwerken.

Deze voorbeelden zijn getest op Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit-productie.

Maak wachtrij

We maken een berichttype, een wachtrijtabel die de berichten kan bevatten en een wachtrij. Berichten in de wachtrij worden eerst op prioriteit verwijderd en vervolgens op hun wachttijd. Als er iets misgaat met de werking van het bericht en de dequeue wordt teruggedraaid, maakt AQ het bericht 3600 seconden later beschikbaar voor dequeue. Het doet dit 48 keer voordat het een uitzonderingswachtrij verplaatst.

create type message_t as object 
   (
   sender  varchar2 ( 50 ),
   message varchar2 ( 512 )
   );
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
     queue_table        => 'MESSAGE_Q_TBL',
     queue_payload_type => 'MESSAGE_T',
     sort_list          => 'PRIORITY,ENQ_TIME',
     multiple_consumers =>  false,
     compatible         => '10.0.0');
  end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
     queue_name          => 'MESSAGE_Q',
     queue_table         => 'MESSAGE_Q_TBL',
     queue_type          =>  0,
     max_retries         =>  48,
     retry_delay         =>  3600,
     dependency_tracking =>  false);
  end;
/
-- PL/SQL procedure successfully completed.

Nu we een plaats hebben om de berichten te plaatsen, kunnen we een pakket maken om berichten in de wachtrij te beheren en te bewerken.

create or replace package message_worker_pkg
is
   queue_name_c constant varchar2(20) := 'MESSAGE_Q';
   
   -- allows the workers to process messages in the queue
   procedure enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.
   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number);

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages;
   
end;
/

create or replace package body message_worker_pkg
is
   -- raised by Oracle when we try to dequeue but no more messages are ready to
   -- be dequeued at this moment
   no_more_messages_ex          exception;
   pragma exception_init (no_more_messages_ex,
                          -25228);

   -- allows the workers to process messages in the queue
   procedure enable_dequeue
   as
   begin
      dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
   end enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue
   as
   begin
      dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
   end disable_dequeue;

   procedure work_message (message_in in out nocopy message_t)
   as
   begin
      dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
   end work_message;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.

   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number)
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.msgid         := descr.msg_id;
      dequeue_options_l.consumer_name := descr.consumer_name;
      dequeue_options_l.wait          := dbms_aq.no_wait;
      dbms_aq.dequeue (queue_name           => descr.queue_name,
                       dequeue_options      => dequeue_options_l,
                       message_properties   => message_properties_l,
                       payload              => message_l,
                       msgid                => message_id_l);
      work_message (message_l);
      commit;
   exception
      when no_more_messages_ex
      then
         -- it's possible work_old_messages already dequeued the message
         commit;
      when others
      then
         -- we don't need to have a raise here.  I just wanted to point out that
         -- since this will be called by AQ throwing the exception back to it
         -- will have it put the message back on the queue and retry later
         raise;
   end on_message_enqueued;

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.wait       := dbms_aq.no_wait;
      dequeue_options_l.navigation := dbms_aq.first_message;

      while (true) loop -- way out is no_more_messages_ex
         dbms_aq.dequeue (queue_name           => queue_name_c,
                          dequeue_options      => dequeue_options_l,
                          message_properties   => message_properties_l,
                          payload              => message_l,
                          msgid                => message_id_l);
         work_message (message_l);
         commit;
      end loop;
   exception
      when no_more_messages_ex
      then
         null;
   end work_old_messages;
end;

Vertel vervolgens AQ dat wanneer een bericht wordt geplaatst aan MESSAGE_Q (en gecommitteerd) onze procedure op de hoogte moet stellen dat het werk moet doen. AQ start een taak in zijn eigen sessie om dit af te handelen.

begin
  dbms_aq.register (
     sys.aq$_reg_info_list (
        sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
                          dbms_aq.namespace_aq,
                          'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
                          hextoraw ('FF'))),
     1);
  commit;
end; 

Start wachtrij en stuur een bericht

declare
   enqueue_options_l      dbms_aq.enqueue_options_t;
   message_properties_l   dbms_aq.message_properties_t;
   message_id_l           raw (16);
   message_l              message_t;
begin
   -- only need to do this next line ONCE
   dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
   
   message_l := new message_t ( 'Jon', 'Hello, world!' );
   dbms_aq.enqueue (queue_name           => message_worker_pkg.queue_name_c,
                    enqueue_options      => enqueue_options_l,
                    message_properties   => message_properties_l,
                    payload              => message_l,
                    msgid                => message_id_l);
   commit;
end;


Modified text is an extract of the original Stack Overflow Documentation
Licentie onder CC BY-SA 3.0
Niet aangesloten bij Stack Overflow