Suche…


Bemerkungen

  • Verwenden Sie niemals DDL oder DML für Tabellen, die mit dbms_aqadm.create_queue_table erstellt wurden. Verwenden Sie nur dbms_aqadm und dbms_aq, um mit diesen Tabellen zu arbeiten. Oracle erstellt möglicherweise mehrere unterstützende Tabellen, Indizes usw., die Sie nicht kennen. Das manuelle Ausführen von DDL oder DML für die Tabelle kann zu einem Szenario führen, in dem der Oracle-Support die Tabelle und die Warteschlangen löschen und erneut erstellen muss, um die Situation zu beheben.

  • Es wird dringend empfohlen, dbms_aq.forever für eine Wait-Option nicht zu verwenden. Dies hat in der Vergangenheit zu Problemen geführt, da Oracle möglicherweise eine übermäßige Anzahl von Worker-Jobs für die Bearbeitung der nicht benötigten Warteschlangen einplanen kann (siehe Oracle Doc ID 2001165.1).

  • Es wird empfohlen, den Parameter AQ_TM_PROCESSES in Version 10.1 und höher nicht festzulegen. Vermeiden Sie insbesondere, diesen Wert auf Null zu setzen, da dadurch der QMON-Hintergrundjob deaktiviert wird, der zum Verwalten der Warteschlangen erforderlich ist. Sie können diesen Wert mit dem folgenden Befehl auf den Oracle-Standard zurücksetzen und die Datenbank neu starten. alter system reset aq_tm_processes scope=spfile sid='*';

Einfacher Produzent / Verbraucher

Überblick

Erstellen Sie eine Warteschlange, an die wir eine Nachricht senden können. Oracle benachrichtigt unsere gespeicherte Prozedur darüber, dass eine Nachricht in die Warteschlange aufgenommen wurde und bearbeitet werden sollte. Wir fügen auch einige Unterprogramme hinzu, die wir in einem Notfall verwenden können, um das Abmelden von Nachrichten zu verhindern, ein erneutes Löschen von Nachrichten zuzulassen und einen einfachen Batch-Job auszuführen, um alle Nachrichten durchzuarbeiten.

Diese Beispiele wurden mit Oracle Database 12c Enterprise Edition Version 12.1.0.2.0 - 64-Bit-Produktion getestet.

Warteschlange erstellen

Wir erstellen einen Nachrichtentyp, eine Warteschlangentabelle, die die Nachrichten enthalten kann, und eine Warteschlange. Nachrichten in der Warteschlange werden zuerst nach Priorität und dann nach ihrer Wartezeit aus der Warteschlange entfernt. Wenn beim Ausführen der Nachricht etwas schief läuft und die Zurücksetzung rückgängig gemacht wird, stellt AQ die Nachricht 3600 Sekunden später für die Zurücksetzung zur Verfügung. Es wird dies 48 Mal tun, bevor es in eine Ausnahmewarteschlange verschoben wird.

create type message_t as object 
   (
   sender  varchar2 ( 50 ),
   message varchar2 ( 512 )
   );
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
     queue_table        => 'MESSAGE_Q_TBL',
     queue_payload_type => 'MESSAGE_T',
     sort_list          => 'PRIORITY,ENQ_TIME',
     multiple_consumers =>  false,
     compatible         => '10.0.0');
  end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
     queue_name          => 'MESSAGE_Q',
     queue_table         => 'MESSAGE_Q_TBL',
     queue_type          =>  0,
     max_retries         =>  48,
     retry_delay         =>  3600,
     dependency_tracking =>  false);
  end;
/
-- PL/SQL procedure successfully completed.

Nun, da wir einen Platz für die Nachrichten haben, können Sie ein Paket zum Verwalten und Bearbeiten von Nachrichten in der Warteschlange erstellen.

create or replace package message_worker_pkg
is
   queue_name_c constant varchar2(20) := 'MESSAGE_Q';
   
   -- allows the workers to process messages in the queue
   procedure enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.
   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number);

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages;
   
end;
/

create or replace package body message_worker_pkg
is
   -- raised by Oracle when we try to dequeue but no more messages are ready to
   -- be dequeued at this moment
   no_more_messages_ex          exception;
   pragma exception_init (no_more_messages_ex,
                          -25228);

   -- allows the workers to process messages in the queue
   procedure enable_dequeue
   as
   begin
      dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
   end enable_dequeue;

   -- prevents messages from being worked but will still allow them to be created and enqueued
   procedure disable_dequeue
   as
   begin
      dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
   end disable_dequeue;

   procedure work_message (message_in in out nocopy message_t)
   as
   begin
      dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
   end work_message;

   -- called only by Oracle Advanced Queueing.  Do not call anywhere else.

   procedure on_message_enqueued (context        in raw,
                                  reginfo        in sys.aq$_reg_info,
                                  descr          in sys.aq$_descriptor,
                                  payload        in raw,
                                  payloadl       in number)
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.msgid         := descr.msg_id;
      dequeue_options_l.consumer_name := descr.consumer_name;
      dequeue_options_l.wait          := dbms_aq.no_wait;
      dbms_aq.dequeue (queue_name           => descr.queue_name,
                       dequeue_options      => dequeue_options_l,
                       message_properties   => message_properties_l,
                       payload              => message_l,
                       msgid                => message_id_l);
      work_message (message_l);
      commit;
   exception
      when no_more_messages_ex
      then
         -- it's possible work_old_messages already dequeued the message
         commit;
      when others
      then
         -- we don't need to have a raise here.  I just wanted to point out that
         -- since this will be called by AQ throwing the exception back to it
         -- will have it put the message back on the queue and retry later
         raise;
   end on_message_enqueued;

   -- allows messages to be worked if we missed the notification (or a retry
   -- is pending)
   procedure work_old_messages
   as
      pragma autonomous_transaction;
      dequeue_options_l      dbms_aq.dequeue_options_t;
      message_id_l           raw (16);
      message_l              message_t;
      message_properties_l   dbms_aq.message_properties_t;
   begin
      dequeue_options_l.wait       := dbms_aq.no_wait;
      dequeue_options_l.navigation := dbms_aq.first_message;

      while (true) loop -- way out is no_more_messages_ex
         dbms_aq.dequeue (queue_name           => queue_name_c,
                          dequeue_options      => dequeue_options_l,
                          message_properties   => message_properties_l,
                          payload              => message_l,
                          msgid                => message_id_l);
         work_message (message_l);
         commit;
      end loop;
   exception
      when no_more_messages_ex
      then
         null;
   end work_old_messages;
end;

Sagen Sie als Nächstes AQ, dass, wenn eine Nachricht in MESSAGE_Q in die Warteschlange eingereiht wird (und festgeschrieben wird), unsere Prozedur benachrichtigt wird, die sie zu erledigen hat. AQ startet einen Job in einer eigenen Sitzung, um dies zu erledigen.

begin
  dbms_aq.register (
     sys.aq$_reg_info_list (
        sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
                          dbms_aq.namespace_aq,
                          'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
                          hextoraw ('FF'))),
     1);
  commit;
end; 

Warteschlange starten und Nachricht senden

declare
   enqueue_options_l      dbms_aq.enqueue_options_t;
   message_properties_l   dbms_aq.message_properties_t;
   message_id_l           raw (16);
   message_l              message_t;
begin
   -- only need to do this next line ONCE
   dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
   
   message_l := new message_t ( 'Jon', 'Hello, world!' );
   dbms_aq.enqueue (queue_name           => message_worker_pkg.queue_name_c,
                    enqueue_options      => enqueue_options_l,
                    message_properties   => message_properties_l,
                    payload              => message_l,
                    msgid                => message_id_l);
   commit;
end;


Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow