Oracle Database
Oracle Advanced Queuing (AQ)
Sök…
Anmärkningar
Använd aldrig DDL eller DML mot tabeller skapade av
dbms_aqadm.create_queue_table
. Använd bara dbms_aqadm och dbms_aq för att arbeta med dessa tabeller. Oracle kan göra flera stödjande tabeller, index osv. Som du inte kommer att känna till. Att manuellt köra DDL eller DML mot tabellen kan leda dig till ett scenario där Oracle Support behöver dig att släppa och återskapa tabellen och köerna för att lösa situationen.Det rekommenderas starkt att du inte använder
dbms_aq.forever
för ettdbms_aq.forever
. Detta har orsakat problem tidigare eftersom Oracle kan börja schemalägga ett alltför stort antal arbetarjobb för att arbeta de onödig köerna (se Oracle Doc ID 2001165.1).Det rekommenderas att du inte ställer in parametern AQ_TM_PROCESSES i version 10.1 och senare. Undvik särskilt att ställa in detta på noll eftersom det kommer att inaktivera QMON-bakgrundsjobbet som är nödvändigt för att upprätthålla köerna. Du kan återställa detta värde till Oracle-standard med följande kommando och starta om databasen.
alter system reset aq_tm_processes scope=spfile sid='*';
Enkel producent / konsument
Översikt
Skapa en kö som vi kan skicka ett meddelande till. Oracle kommer att meddela vår lagrade procedur att ett meddelande har antagits och bör fungeras. Vi lägger också till några underprogram som vi kan använda i en nödsituation för att stoppa meddelanden från att bli deqeued, tillåta avveckling igen och köra ett enkelt batchjobb för att arbeta igenom alla meddelanden.
Dessa exempel testades på Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production.
Skapa kö
Vi skapar en meddelandetyp, en kötabell som kan innehålla meddelanden och en kö. Meddelanden i kön kommer att avlägsnas först efter prioritering och sedan vara deras övergångstid. Om något går fel fungerar meddelandet och utmatningen rullas tillbaka kommer AQ att göra meddelandet tillgängligt för dequeue 3600 sekunder senare. Det kommer att göra detta 48 gånger innan du flyttar en undantagskö.
create type message_t as object
(
sender varchar2 ( 50 ),
message varchar2 ( 512 )
);
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
queue_table => 'MESSAGE_Q_TBL',
queue_payload_type => 'MESSAGE_T',
sort_list => 'PRIORITY,ENQ_TIME',
multiple_consumers => false,
compatible => '10.0.0');
end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
queue_name => 'MESSAGE_Q',
queue_table => 'MESSAGE_Q_TBL',
queue_type => 0,
max_retries => 48,
retry_delay => 3600,
dependency_tracking => false);
end;
/
-- PL/SQL procedure successfully completed.
Nu när vi har en plats att lägga till meddelanden kan vi skapa ett paket för att hantera och arbeta meddelanden i kön.
create or replace package message_worker_pkg
is
queue_name_c constant varchar2(20) := 'MESSAGE_Q';
-- allows the workers to process messages in the queue
procedure enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number);
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages;
end;
/
create or replace package body message_worker_pkg
is
-- raised by Oracle when we try to dequeue but no more messages are ready to
-- be dequeued at this moment
no_more_messages_ex exception;
pragma exception_init (no_more_messages_ex,
-25228);
-- allows the workers to process messages in the queue
procedure enable_dequeue
as
begin
dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
end enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue
as
begin
dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
end disable_dequeue;
procedure work_message (message_in in out nocopy message_t)
as
begin
dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
end work_message;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number)
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.msgid := descr.msg_id;
dequeue_options_l.consumer_name := descr.consumer_name;
dequeue_options_l.wait := dbms_aq.no_wait;
dbms_aq.dequeue (queue_name => descr.queue_name,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
exception
when no_more_messages_ex
then
-- it's possible work_old_messages already dequeued the message
commit;
when others
then
-- we don't need to have a raise here. I just wanted to point out that
-- since this will be called by AQ throwing the exception back to it
-- will have it put the message back on the queue and retry later
raise;
end on_message_enqueued;
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.wait := dbms_aq.no_wait;
dequeue_options_l.navigation := dbms_aq.first_message;
while (true) loop -- way out is no_more_messages_ex
dbms_aq.dequeue (queue_name => queue_name_c,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
end loop;
exception
when no_more_messages_ex
then
null;
end work_old_messages;
end;
Berätta sedan för AQ att när ett meddelande anmäls till MESSAGE_Q (och begås) meddelar vår procedur så har det arbete att göra. AQ kommer att starta ett jobb i sin egen session för att hantera detta.
begin
dbms_aq.register (
sys.aq$_reg_info_list (
sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
dbms_aq.namespace_aq,
'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
hextoraw ('FF'))),
1);
commit;
end;
Starta kö och skicka ett meddelande
declare
enqueue_options_l dbms_aq.enqueue_options_t;
message_properties_l dbms_aq.message_properties_t;
message_id_l raw (16);
message_l message_t;
begin
-- only need to do this next line ONCE
dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
message_l := new message_t ( 'Jon', 'Hello, world!' );
dbms_aq.enqueue (queue_name => message_worker_pkg.queue_name_c,
enqueue_options => enqueue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
commit;
end;