Search code examples
oraclequeuepropagation

Oracle advanced queue propagation not working for me


I'd like to set up propagation in Oracle AQ (11).

I'd like to propagate from queue "Q" in queue table "QT" to queue "QD" in queue table "QTD".

This is my setup:

DECLARE 
 subscriber sys.aq$_agent; 
BEGIN
 DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QT',multiple_consumers=>TRUE,queue_payload_type=>'RAW');
 DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QTD',queue_payload_type=>'RAW');
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'Q', queue_table => 'QT'); 
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD', queue_table => 'QTD'); 
 DBMS_AQADM.START_QUEUE(queue_name => 'Q');
 DBMS_AQADM.START_QUEUE(queue_name => 'QD');
 subscriber := sys.aq$_agent('SUB', 'QD', NULL);
 DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber, queue_to_queue => TRUE);
 DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'Q');
END;
/

I send message from a Java aqapi client. The message is sent without error, I can see it in the "Q" queue:

select * from QT;

"Q_NAME"    "MSGID" "CORRID"    "PRIORITY"  "STATE" "DELAY" "EXPIRATION"    "TIME_MANAGER_INFO" "LOCAL_ORDER_NO"    "CHAIN_NO"  "CSCN"  "DSCN"  "ENQ_TIME"  "ENQ_UID"   "ENQ_TID"   "DEQ_TIME"  "DEQ_UID"   "DEQ_TID"   "RETRY_COUNT"   "EXCEPTION_QSCHEMA" "EXCEPTION_QUEUE"   "STEP_NO"   "RECIPIENT_KEY" "DEQUEUE_MSGID" "SENDER_NAME"   "SENDER_ADDRESS"    "SENDER_PROTOCOL"   "USER_DATA" "USER_PROP"
"Q" FC914BFDC7489ECEE040010A393F3DD1    ""  1   0               0   0   0   0   24-JUN-14 07.56.27.258348000 AM "RISKOPALL" "9.5.283837"        ""  ""  0   ""  ""  0   0       ""  ""  0   (BLOB)  

But I can't see it in the "QD" destination queue:

select * from QTD;

Shows empty result.

Do you have any idea what's wrong with it?

I already tried ENABLE_PROPAGATION_SCHEDULE, but it's already enabled after SCHEDULE_PROPAGATION. It results in an error.

I checked this page: http://docs.oracle.com/cd/B28359_01/server.111/b28420/aq_trbl.htm but I can't find DBA_QUEUE_SCHEDULES view. I have admin rights. Where shall I look for it? How shall I troubleshoot propagation?

Any help is really apreciated!


Solution

  • Solved it!

    If the target queue (where messages propagated to) is single-consumer queue then the subscriber name must be set to NULL! This was my problem. It is documented in the 11g doc:

    http://docs.oracle.com/cd/B28359_01/server.111/b28420/aq_admin.htm#i1008642 :

    "The agent name should be NULL if the destination queue is a single consumer queue."

    The problematic line in my setup:

    subscriber := sys.aq$_agent('SUB', 'QD', NULL);
    

    It should have been:

    subscriber := sys.aq$_agent(NULL, 'QD', NULL);
    

    In finding out the problem the nice troubleshooting guide below was really helpful:

    https://blogs.oracle.com/db/entry/oracle_support_master_note_for_troubleshooting_advanced_queuing_and_oracle_streams_propagation_issue

    Section 4.3 advises to check the alert logs. I checked them and indeed in the trace file I found

    kwqpdest: exception 24039
    kwqpdest: Error 24039 propagating to "TEST"."QD"
    

    After that it was not too difficult to find out why 24039 is thrown.

    So afterall here is the working setup in my 11g server. It propagates messages from the source to three targets. The source is multi consumer queue (it must be), the targets are single consumer queues:

    BEGIN
     DBMS_AQADM.DROP_QUEUE_TABLE(queue_table=>'QT', force => TRUE);
     DBMS_AQADM.DROP_QUEUE_TABLE(queue_table=>'QTD', force => TRUE);
    END;
    /
    DECLARE
     subscriber sys.aq$_agent;
    BEGIN
     DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QT',multiple_consumers=>TRUE,
      queue_payload_type=>'SYS.AQ$_JMS_TEXT_MESSAGE');
     DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QTD',multiple_consumers=>FALSE,
      queue_payload_type=>'SYS.AQ$_JMS_TEXT_MESSAGE');
     DBMS_AQADM.CREATE_QUEUE(queue_name => 'Q', queue_table => 'QT');
     DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD1', queue_table => 'QTD');
     DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD2', queue_table => 'QTD');
     DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD3', queue_table => 'QTD');
     DBMS_AQADM.START_QUEUE(queue_name => 'Q');
     DBMS_AQADM.START_QUEUE(queue_name => 'QD1');
     DBMS_AQADM.START_QUEUE(queue_name => 'QD2');
     DBMS_AQADM.START_QUEUE(queue_name => 'QD3');
     subscriber := sys.aq$_agent(NULL, 'QD1', NULL);
     DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber);
     subscriber := sys.aq$_agent(NULL, 'QD2', NULL);
     DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber);
     subscriber := sys.aq$_agent(NULL, 'QD3', NULL);
     DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber);
     DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'Q', latency => 0);
    END;