Sök…


Anmärkningar

  • Använd aldrig DDL eller DML mot tabeller skapade av dbms_aqadm.create_queue_table . Använd bara dbms_aqadm och dbms_aq för att arbeta med dessa tabeller. Oracle kan göra flera stödjande tabeller, index osv. Som du inte kommer att känna till. Att manuellt köra DDL eller DML mot tabellen kan leda dig till ett scenario där Oracle Support behöver dig att släppa och återskapa tabellen och köerna för att lösa situationen.

  • Det rekommenderas starkt att du inte använder dbms_aq.forever för ett dbms_aq.forever . Detta har orsakat problem tidigare eftersom Oracle kan börja schemalägga ett alltför stort antal arbetarjobb för att arbeta de onödig köerna (se Oracle Doc ID 2001165.1).

  • Det rekommenderas att du inte ställer in parametern AQ_TM_PROCESSES i version 10.1 och senare. Undvik särskilt att ställa in detta på noll eftersom det kommer att inaktivera QMON-bakgrundsjobbet som är nödvändigt för att upprätthålla köerna. Du kan återställa detta värde till Oracle-standard med följande kommando och starta om databasen. alter system reset aq_tm_processes scope=spfile sid='*';

Enkel producent / konsument

Översikt

Skapa en kö som vi kan skicka ett meddelande till. Oracle kommer att meddela vår lagrade procedur att ett meddelande har antagits och bör fungeras. Vi lägger också till några underprogram som vi kan använda i en nödsituation för att stoppa meddelanden från att bli deqeued, tillåta avveckling igen och köra ett enkelt batchjobb för att arbeta igenom alla meddelanden.

Dessa exempel testades på Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production.

Skapa kö

Vi skapar en meddelandetyp, en kötabell som kan innehålla meddelanden och en kö. Meddelanden i kön kommer att avlägsnas först efter prioritering och sedan vara deras övergångstid. Om något går fel fungerar meddelandet och utmatningen rullas tillbaka kommer AQ att göra meddelandet tillgängligt för dequeue 3600 sekunder senare. Det kommer att göra detta 48 gånger innan du flyttar en undantagskö.

create type message_t as object 
   (
   sender  varchar2 ( 50 ),
   message varchar2 ( 512 )
   );
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
     queue_table        => 'MESSAGE_Q_TBL',
     queue_payload_type => 'MESSAGE_T',
     sort_list          => 'PRIORITY,ENQ_TIME',
     multiple_consumers =>  false,
     compatible         => '10.0.0');
  end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
     queue_name          => 'MESSAGE_Q',
     queue_table         => 'MESSAGE_Q_TBL',
     queue_type          =>  0,
     max_retries         =>  48,
     retry_delay         =>  3600,
     dependency_tracking =>  false);
  end;
/
-- PL/SQL procedure successfully completed.

Nu när vi har en plats att lägga till meddelanden kan vi skapa ett paket för att hantera och arbeta meddelanden i kön.

create or replace package message_worker_pkg
is
   queue_name_c constant varchar2(20) := 'MESSAGE_Q';
   
   -- allows the workers to process messages in the queue
   procedure enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.
   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number);

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages;
   
end;
/

create or replace package body message_worker_pkg
is
   -- raised by Oracle when we try to dequeue but no more messages are ready to
   -- be dequeued at this moment
   no_more_messages_ex          exception;
   pragma exception_init (no_more_messages_ex,
                          -25228);

   -- allows the workers to process messages in the queue
   procedure enable_dequeue
   as
   begin
      dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
   end enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue
   as
   begin
      dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
   end disable_dequeue;

   procedure work_message (message_in in out nocopy message_t)
   as
   begin
      dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
   end work_message;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.

   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number)
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.msgid         := descr.msg_id;
      dequeue_options_l.consumer_name := descr.consumer_name;
      dequeue_options_l.wait          := dbms_aq.no_wait;
      dbms_aq.dequeue (queue_name           => descr.queue_name,
                       dequeue_options      => dequeue_options_l,
                       message_properties   => message_properties_l,
                       payload              => message_l,
                       msgid                => message_id_l);
      work_message (message_l);
      commit;
   exception
      when no_more_messages_ex
      then
         -- it's possible work_old_messages already dequeued the message
         commit;
      when others
      then
         -- we don't need to have a raise here.  I just wanted to point out that
         -- since this will be called by AQ throwing the exception back to it
         -- will have it put the message back on the queue and retry later
         raise;
   end on_message_enqueued;

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.wait       := dbms_aq.no_wait;
      dequeue_options_l.navigation := dbms_aq.first_message;

      while (true) loop -- way out is no_more_messages_ex
         dbms_aq.dequeue (queue_name           => queue_name_c,
                          dequeue_options      => dequeue_options_l,
                          message_properties   => message_properties_l,
                          payload              => message_l,
                          msgid                => message_id_l);
         work_message (message_l);
         commit;
      end loop;
   exception
      when no_more_messages_ex
      then
         null;
   end work_old_messages;
end;

Berätta sedan för AQ att när ett meddelande anmäls till MESSAGE_Q (och begås) meddelar vår procedur så har det arbete att göra. AQ kommer att starta ett jobb i sin egen session för att hantera detta.

begin
  dbms_aq.register (
     sys.aq$_reg_info_list (
        sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
                          dbms_aq.namespace_aq,
                          'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
                          hextoraw ('FF'))),
     1);
  commit;
end; 

Starta kö och skicka ett meddelande

declare
   enqueue_options_l      dbms_aq.enqueue_options_t;
   message_properties_l   dbms_aq.message_properties_t;
   message_id_l           raw (16);
   message_l              message_t;
begin
   -- only need to do this next line ONCE
   dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
   
   message_l := new message_t ( 'Jon', 'Hello, world!' );
   dbms_aq.enqueue (queue_name           => message_worker_pkg.queue_name_c,
                    enqueue_options      => enqueue_options_l,
                    message_properties   => message_properties_l,
                    payload              => message_l,
                    msgid                => message_id_l);
   commit;
end;


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow