Oracle Database
Oracle Advanced Queuing (AQ)
Ricerca…
Osservazioni
Non utilizzare mai DDL o DML contro tabelle create da
dbms_aqadm.create_queue_table
. Utilizzare solo dbms_aqadm e dbms_aq per lavorare con queste tabelle. Oracle potrebbe fare diverse tabelle di supporto, indici, ecc. Di cui non sarai a conoscenza. L'esecuzione manuale di DDL o DML sulla tabella può portare a uno scenario in cui il supporto Oracle richiede la rimozione e la ricreazione della tabella e delle code per risolvere la situazione.Si consiglia vivamente di non utilizzare
dbms_aq.forever
per l'opzione di attesa. Ciò ha causato problemi in passato in quanto Oracle potrebbe iniziare a pianificare un numero eccessivo di lavori di lavoro per far funzionare le code non necessarie (vedere Oracle Doc ID 2001165.1).Si consiglia di non impostare il parametro AQ_TM_PROCESSES nella versione 10.1 e successive. Evitare in particolare di azzerare questo dato che disabiliterà il lavoro in background QMON necessario per mantenere le code. È possibile ripristinare questo valore su Oracle predefinito utilizzando il seguente comando e riavviando il database.
alter system reset aq_tm_processes scope=spfile sid='*';
Semplice produttore / consumatore
Panoramica
Crea una coda alla quale possiamo inviare un messaggio. Oracle notificherà alla nostra stored procedure che un messaggio è stato accodato e deve essere lavorato. Aggiungiamo anche alcuni sottoprogrammi che possiamo utilizzare in caso di emergenza per impedire che i messaggi vengano squalificati, consentire nuovamente la rimozione della coda e avviare un semplice processo batch per gestire tutti i messaggi.
Questi esempi sono stati testati su Oracle Database 12c Enterprise Edition versione 12.1.0.2.0 - Produzione a 64 bit.
Crea coda
Creeremo un tipo di messaggio, una tabella della coda che può contenere i messaggi e una coda. I messaggi nella coda verranno eliminati prima dalla priorità, quindi sarà il loro tempo di accodamento. Se qualcosa va storto, lavorando il messaggio e il dequeue viene eseguito il rollback, AQ renderà il messaggio disponibile per la rimozione della coda dopo 3600 secondi. Lo farà 48 volte prima di spostarlo come una coda di eccezione.
create type message_t as object
(
sender varchar2 ( 50 ),
message varchar2 ( 512 )
);
/
-- Type MESSAGE_T compiled
begin dbms_aqadm.create_queue_table(
queue_table => 'MESSAGE_Q_TBL',
queue_payload_type => 'MESSAGE_T',
sort_list => 'PRIORITY,ENQ_TIME',
multiple_consumers => false,
compatible => '10.0.0');
end;
/
-- PL/SQL procedure successfully completed.
begin dbms_aqadm.create_queue(
queue_name => 'MESSAGE_Q',
queue_table => 'MESSAGE_Q_TBL',
queue_type => 0,
max_retries => 48,
retry_delay => 3600,
dependency_tracking => false);
end;
/
-- PL/SQL procedure successfully completed.
Ora che abbiamo un posto dove mettere i messaggi, creiamo un pacchetto per gestire e lavorare i messaggi in coda.
create or replace package message_worker_pkg
is
queue_name_c constant varchar2(20) := 'MESSAGE_Q';
-- allows the workers to process messages in the queue
procedure enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number);
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages;
end;
/
create or replace package body message_worker_pkg
is
-- raised by Oracle when we try to dequeue but no more messages are ready to
-- be dequeued at this moment
no_more_messages_ex exception;
pragma exception_init (no_more_messages_ex,
-25228);
-- allows the workers to process messages in the queue
procedure enable_dequeue
as
begin
dbms_aqadm.start_queue (queue_name => queue_name_c, dequeue => true);
end enable_dequeue;
-- prevents messages from being worked but will still allow them to be created and enqueued
procedure disable_dequeue
as
begin
dbms_aqadm.stop_queue (queue_name => queue_name_c, dequeue => true, enqueue => false);
end disable_dequeue;
procedure work_message (message_in in out nocopy message_t)
as
begin
dbms_output.put_line ( message_in.sender || ' says ' || message_in.message );
end work_message;
-- called only by Oracle Advanced Queueing. Do not call anywhere else.
procedure on_message_enqueued (context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in raw,
payloadl in number)
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.msgid := descr.msg_id;
dequeue_options_l.consumer_name := descr.consumer_name;
dequeue_options_l.wait := dbms_aq.no_wait;
dbms_aq.dequeue (queue_name => descr.queue_name,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
exception
when no_more_messages_ex
then
-- it's possible work_old_messages already dequeued the message
commit;
when others
then
-- we don't need to have a raise here. I just wanted to point out that
-- since this will be called by AQ throwing the exception back to it
-- will have it put the message back on the queue and retry later
raise;
end on_message_enqueued;
-- allows messages to be worked if we missed the notification (or a retry
-- is pending)
procedure work_old_messages
as
pragma autonomous_transaction;
dequeue_options_l dbms_aq.dequeue_options_t;
message_id_l raw (16);
message_l message_t;
message_properties_l dbms_aq.message_properties_t;
begin
dequeue_options_l.wait := dbms_aq.no_wait;
dequeue_options_l.navigation := dbms_aq.first_message;
while (true) loop -- way out is no_more_messages_ex
dbms_aq.dequeue (queue_name => queue_name_c,
dequeue_options => dequeue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
work_message (message_l);
commit;
end loop;
exception
when no_more_messages_ex
then
null;
end work_old_messages;
end;
Dì quindi ad AQ che quando un messaggio viene messo in coda a MESSAGE_Q (e impegnato) notifica alla nostra procedura che ha del lavoro da fare. AQ avvierà un lavoro nella propria sessione per gestirlo.
begin
dbms_aq.register (
sys.aq$_reg_info_list (
sys.aq$_reg_info (user || '.' || message_worker_pkg.queue_name_c,
dbms_aq.namespace_aq,
'plsql://' || user || '.message_worker_pkg.on_message_enqueued',
hextoraw ('FF'))),
1);
commit;
end;
Avvia coda e invia un messaggio
declare
enqueue_options_l dbms_aq.enqueue_options_t;
message_properties_l dbms_aq.message_properties_t;
message_id_l raw (16);
message_l message_t;
begin
-- only need to do this next line ONCE
dbms_aqadm.start_queue (queue_name => message_worker_pkg.queue_name_c, enqueue => true , dequeue => true);
message_l := new message_t ( 'Jon', 'Hello, world!' );
dbms_aq.enqueue (queue_name => message_worker_pkg.queue_name_c,
enqueue_options => enqueue_options_l,
message_properties => message_properties_l,
payload => message_l,
msgid => message_id_l);
commit;
end;