Oracle Database
Oracle Advanced Queuing (AQ)
Suche…
Bemerkungen
Verwenden Sie niemals DDL oder DML für Tabellen, die mit
dbms_aqadm.create_queue_table
erstellt wurden. Verwenden Sie nur dbms_aqadm und dbms_aq, um mit diesen Tabellen zu arbeiten. Oracle erstellt möglicherweise mehrere unterstützende Tabellen, Indizes usw., die Sie nicht kennen. Das manuelle Ausführen von DDL oder DML für die Tabelle kann zu einem Szenario führen, in dem der Oracle-Support die Tabelle und die Warteschlangen löschen und erneut erstellen muss, um die Situation zu beheben.Es wird dringend empfohlen,
dbms_aq.forever
für eine Wait-Option nicht zu verwenden. Dies hat in der Vergangenheit zu Problemen geführt, da Oracle möglicherweise eine übermäßige Anzahl von Worker-Jobs für die Bearbeitung der nicht benötigten Warteschlangen einplanen kann (siehe Oracle Doc ID 2001165.1).Es wird empfohlen, den Parameter AQ_TM_PROCESSES in Version 10.1 und höher nicht festzulegen. Vermeiden Sie insbesondere, diesen Wert auf Null zu setzen, da dadurch der QMON-Hintergrundjob deaktiviert wird, der zum Verwalten der Warteschlangen erforderlich ist. Sie können diesen Wert mit dem folgenden Befehl auf den Oracle-Standard zurücksetzen und die Datenbank neu starten.
alter system reset aq_tm_processes scope=spfile sid='*';
Einfacher Produzent / Verbraucher
Überblick
Erstellen Sie eine Warteschlange, an die wir eine Nachricht senden können. Oracle benachrichtigt unsere gespeicherte Prozedur darüber, dass eine Nachricht in die Warteschlange aufgenommen wurde und bearbeitet werden sollte. Wir fügen auch einige Unterprogramme hinzu, die wir in einem Notfall verwenden können, um das Abmelden von Nachrichten zu verhindern, ein erneutes Löschen von Nachrichten zuzulassen und einen einfachen Batch-Job auszuführen, um alle Nachrichten durchzuarbeiten.
Diese Beispiele wurden mit Oracle Database 12c Enterprise Edition Version 12.1.0.2.0 - 64-Bit-Produktion getestet.
Warteschlange erstellen
Wir erstellen einen Nachrichtentyp, eine Warteschlangentabelle, die die Nachrichten enthalten kann, und eine Warteschlange. Nachrichten in der Warteschlange werden zuerst nach Priorität und dann nach ihrer Wartezeit aus der Warteschlange entfernt. Wenn beim Ausführen der Nachricht etwas schief läuft und die Zurücksetzung rückgängig gemacht wird, stellt AQ die Nachricht 3600 Sekunden später für die Zurücksetzung zur Verfügung. Es wird dies 48 Mal tun, bevor es in eine Ausnahmewarteschlange verschoben wird.
create type message_t as object
(
sender varchar2 ( 50 ),
message varchar2 ( 512 )
);
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
queue_table => 'MESSAGE_Q_TBL',
queue_payload_type => 'MESSAGE_T',
sort_list => 'PRIORITY,ENQ_TIME',
multiple_consumers => false,
compatible => '10.0.0');
end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
queue_name => 'MESSAGE_Q',
queue_table => 'MESSAGE_Q_TBL',
queue_type => 0,
max_retries => 48,
retry_delay => 3600,
dependency_tracking => false);
end;
/
-- PL/SQL procedure successfully completed.
Nun, da wir einen Platz für die Nachrichten haben, können Sie ein Paket zum Verwalten und Bearbeiten von Nachrichten in der Warteschlange erstellen.
create or replace package message_worker_pkg
is
queue_name_c constant varchar2(20) := 'MESSAGE_Q';
-- allows the workers to process messages in the queue
procedure enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number);
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages;
end;
/
create or replace package body message_worker_pkg
is
-- raised by Oracle when we try to dequeue but no more messages are ready to
-- be dequeued at this moment
no_more_messages_ex exception;
pragma exception_init (no_more_messages_ex,
-25228);
-- allows the workers to process messages in the queue
procedure enable_dequeue
as
begin
dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
end enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue
as
begin
dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
end disable_dequeue;
procedure work_message (message_in in out nocopy message_t)
as
begin
dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
end work_message;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number)
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.msgid := descr.msg_id;
dequeue_options_l.consumer_name := descr.consumer_name;
dequeue_options_l.wait := dbms_aq.no_wait;
dbms_aq.dequeue (queue_name => descr.queue_name,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
exception
when no_more_messages_ex
then
-- it's possible work_old_messages already dequeued the message
commit;
when others
then
-- we don't need to have a raise here. I just wanted to point out that
-- since this will be called by AQ throwing the exception back to it
-- will have it put the message back on the queue and retry later
raise;
end on_message_enqueued;
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.wait := dbms_aq.no_wait;
dequeue_options_l.navigation := dbms_aq.first_message;
while (true) loop -- way out is no_more_messages_ex
dbms_aq.dequeue (queue_name => queue_name_c,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
end loop;
exception
when no_more_messages_ex
then
null;
end work_old_messages;
end;
Sagen Sie als Nächstes AQ, dass, wenn eine Nachricht in MESSAGE_Q in die Warteschlange eingereiht wird (und festgeschrieben wird), unsere Prozedur benachrichtigt wird, die sie zu erledigen hat. AQ startet einen Job in einer eigenen Sitzung, um dies zu erledigen.
begin
dbms_aq.register (
sys.aq$_reg_info_list (
sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
dbms_aq.namespace_aq,
'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
hextoraw ('FF'))),
1);
commit;
end;
Warteschlange starten und Nachricht senden
declare
enqueue_options_l dbms_aq.enqueue_options_t;
message_properties_l dbms_aq.message_properties_t;
message_id_l raw (16);
message_l message_t;
begin
-- only need to do this next line ONCE
dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
message_l := new message_t ( 'Jon', 'Hello, world!' );
dbms_aq.enqueue (queue_name => message_worker_pkg.queue_name_c,
enqueue_options => enqueue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
commit;
end;