Search code examples
javaoracleadvanced-queuing

Oracle Advanced Queue In Java


I am implementing Oracle Advanced Queue and am completely new to it. I have a few doubts regarding it. Below is my code:

package com;

/* Set up main class from which we will call subsequent examples and handle 
 exceptions: */
import java.sql.*;

import oracle.AQ.*;

public class test_aqjava
{
    public static void main(String args[]) 
    {
        AQSession  aq_sess = null;
        try 
        {
            aq_sess = createSession(args);
            createAqTables(aq_sess);
            enqueueMsg(aq_sess);
            // dequeueMsg(aq_sess);
            aq_sess.close();

            /* now run the test: */
            // runTest(aq_sess);     
        }
        catch (Exception ex)
        {
            System.out.println("Exception-1: " + ex); 
            ex.printStackTrace();      
        }  
    }

    public static AQSession createSession(String args[]) 
    {
        Connection db_conn;
        AQSession  aq_sess = null;

        try 
        {

            Class.forName("oracle.jdbc.driver.OracleDriver");
            /* your actual hostname, port number, and SID will 
    vary from what follows. Here we use 'dlsun736,' '5521,'
    and 'test,' respectively: */

            db_conn =
                    DriverManager.getConnection(
                            "jdbc:oracle:thin:@hostname.com:1521:sid", 
                            "USER", "USER");

            System.out.println("JDBC Connection opened "); 
            db_conn.setAutoCommit(false);

            /* Load the Oracle8i AQ driver: */
            Class.forName("oracle.AQ.AQOracleDriver");

            /* Creating an AQ Session: */
            aq_sess = AQDriverManager.createAQSession(db_conn);
            System.out.println("Successfully created AQSession ");  
        }
        catch (Exception ex)
        {
            System.out.println("Exception: " + ex); 
            ex.printStackTrace();      
        }  
        return aq_sess;
    }

    public static void createAqTables(AQSession aq_sess) throws AQException
    {
        AQQueueTableProperty     qtable_prop;
        AQQueueProperty          queue_prop;
        AQQueueTable             q_table;
        AQQueue                  queue;


        /* Creating a AQQueueTableProperty object (payload type - RAW): */
        qtable_prop = new AQQueueTableProperty("RAW"); 

        /* Creating a queue table called aq_table1 in aqjava schema: */
        q_table = aq_sess.createQueueTable ("USER", "aq_table1", qtable_prop);
        System.out.println("Successfully created aq_table1 in aqjava schema");  

        /* Creating a new AQQueueProperty object */
        queue_prop = new AQQueueProperty();

        /* Creating a queue called aq_queue1 in aq_table1: */
        queue = aq_sess.createQueue (q_table, "aq_queue1", queue_prop);
        System.out.println("Successfully created aq_queue1 in aq_table1");  

        /* Enable enqueue/dequeue on this queue: */
        queue.start();
        System.out.println("Successful start queue");  
    }

    public static void enqueueMsg(AQSession aq_sess) throws AQException
    {
        AQQueueTable             q_table;
        AQQueue                  queue;
        AQMessage                message;
        AQRawPayload             raw_payload;
        AQEnqueueOption          enq_option;
        String                   test_data = "new message";
        byte[]                   b_array;
        Connection               db_conn;

        db_conn = ((AQOracleSession)aq_sess).getDBConnection();

        /* Get a handle to queue table - aq_table4 in aqjava schema: */
        q_table = aq_sess.getQueueTable ("USER", "aq_table1");
        System.out.println("Successful getQueueTable");  

        /* Get a handle to a queue - aq_queue4 in aquser schema: */
        queue = aq_sess.getQueue ("USER", "aq_queue1");
        System.out.println("Successful getQueue");  

        /* Creating a message to contain raw payload: */
        message = queue.createMessage();

        /* Get handle to the AQRawPayload object and populate it with raw data: */
        b_array = test_data.getBytes();

        raw_payload = message.getRawPayload();

        raw_payload.setStream(b_array, b_array.length);

        /* Creating a AQEnqueueOption object with default options: */
        enq_option = new AQEnqueueOption();
        /* Enqueue the message: */
        queue.enqueue(enq_option, message);

        try {
            db_conn.commit();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public static void dequeueMsg(AQSession aq_sess) throws AQException
    {
        AQQueueTable             q_table;
        AQQueue                  queue;
        AQMessage                message;
        AQRawPayload             raw_payload;
        AQDequeueOption          deq_option;
        byte[]                   b_array;
        Connection               db_conn;

        db_conn = ((AQOracleSession)aq_sess).getDBConnection();

        /* Get a handle to queue table - aq_table4 in aqjava schema: */
        q_table = aq_sess.getQueueTable ("USER", "aq_table1");
        System.out.println("Successful getQueueTable");  

        /* Get a handle to a queue - aq_queue4 in aquser schema: */
        queue = aq_sess.getQueue ("USER", "aq_queue1");
        System.out.println("Successful getQueue");  

        /* Creating a AQDequeueOption object with default options: */
        deq_option = new AQDequeueOption();
        /* Enqueue the message: */
        message = queue.dequeue(deq_option);
        raw_payload = message.getRawPayload();
        b_array=  raw_payload.getBytes();
        String msg = new String(b_array);
        System.out.println("Dequeue Msg "+msg);

        try {
            db_conn.commit();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

I have created a Queue Table and a Queue. Message was written and read from the queue.

Q1. Can i write one more message to the same queue and read from it? If yes, how do we do it? BecauseI tried writing message to the same queue but couldn't

Q2. How do I convert my above code into publish-subscribe? How can I test it by reading the same message multiple times?

Any help appreciated.


Solution

  • 1) Log to oracle db and create user.

    CREATE USER jmsuser IDENTIFIED BY a;
    GRANT DBA, AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE to jmsuser;
    GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
    GRANT EXECUTE ON DBMS_AQ TO jmsuser;
    GRANT EXECUTE ON DBMS_LOB TO jmsuser;
    GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;
    

    2) Class creates multiconsumer queue and registers two subscriber for queue. (ConnectionDefinition.getOracleConnection() return regular jdbc connection to oracle)

    import java.sql.Connection;
    import oracle.AQ.AQAgent;
    import oracle.AQ.AQDriverManager;
    import oracle.AQ.AQQueue;
    import oracle.AQ.AQQueueProperty;
    import oracle.AQ.AQQueueTable;
    import oracle.AQ.AQQueueTableProperty;
    import oracle.AQ.AQSession;
    
    /**
     *
     * @author alukasiewicz
     */
    public class NewClass {
    
        public static void main(String[] args) throws Exception {
            Class.forName("oracle.AQ.AQOracleDriver");
            Connection con = ConnectionDefinition.getOracleConnection();
            AQSession aq_sess = AQDriverManager.createAQSession(con);
            AQQueueTableProperty qtable_prop;
            AQQueueProperty queue_prop;
            AQQueueTable q_table;
            AQQueue queue;
            AQAgent subs1, subs2;
            qtable_prop = new AQQueueTableProperty("SYS.AQ$_JMS_BYTES_MESSAGE");
            qtable_prop.setMultiConsumer(true);
            q_table = aq_sess.createQueueTable("jmsuser", "aq_table5", qtable_prop);
            queue_prop = new AQQueueProperty();
            queue = aq_sess.createQueue(q_table, "aq_queue5", queue_prop);
            System.out.println("Successful createQueue");
            System.out.println("Successful start queue");
            subs1 = new AQAgent("GREEN", "", 0);
            subs2 = new AQAgent("BLUE", "", 0);
            queue.addSubscriber(subs2, null);
            queue.addSubscriber(subs1, null);
            queue.start();
        }
    }
    

    3) Class publishes messages to queue.

        public class Publisher {
    
            public static void main(String[] args) throws Exception {
                Class.forName("oracle.AQ.AQOracleDriver");
                Connection con = ConnectionDefinition.getOracleConnection();
                TopicConnection tc_conn =AQjmsTopicConnectionFactory.createTopicConnection(con);
                tc_conn.start();
                TopicSession jms_sess = tc_conn.createTopicSession(true, Session.SESSION_TRANSACTED);
                Topic queueTopic= ((AQjmsSession )jms_sess).getTopic("JMSUSER","AQ_QUEUE5");
                AQjmsTopicPublisher publisherAq = (AQjmsTopicPublisher)jms_sess.createPublisher(queueTopic);
                BytesMessage messAll = jms_sess.createBytesMessage();
                BytesMessage messOnlyForGreen = jms_sess.createBytesMessage();
                messAll.writeUTF("Message for all subscribers");
                messOnlyForGreen.writeUTF("Message only for green");
                publisherAq.publish(messAll);
                publisherAq.publish(messOnlyForGreen, new AQjmsAgent[]{new AQjmsAgent("GREEN", null)} );
                con.commit();
                tc_conn.close();
                con.close();                
            } 
        }
    

    In oracle you can view these messages in queue. Two for green one for red.

        SELECT a.queue,  a.msg_state, a.consumer_name FROM jmsuser.aq$aq_table5 a
    

    4) Class reads message from queue;

    public class Subscriber {
    
        public static void main(String[] args) throws Exception {
            Class.forName("oracle.AQ.AQOracleDriver");
            Connection con = ConnectionDefinition.getOracleConnection();
            TopicConnection tc_conn = AQjmsTopicConnectionFactory.createTopicConnection(con);
            TopicSession jms_sess = tc_conn.createTopicSession(true, Session.SESSION_TRANSACTED);
            tc_conn.start();
            Topic queueTopic = ((AQjmsSession) jms_sess).getTopic("jmsuser", "AQ_QUEUE5");
            TopicSubscriber subGreen =  (TopicSubscriber)((AQjmsSession) jms_sess).createDurableSubscriber(queueTopic, "GREEN");
            TopicSubscriber subRed =  (TopicSubscriber)((AQjmsSession) jms_sess).createDurableSubscriber(queueTopic, "RED");        
            Message msg = subGreen.receive(10);
            System.err.println("Start receiving message for green subscriber");
            while(msg != null){
                  System.err.println("     GREEN recive message "+ ((BytesMessage)msg).readUTF());
                  msg = subGreen.receive(10); // receive with timeout;
            }
            System.err.println("End receiving message for green subscriber");
            System.err.println("  ");        
            System.err.println("Start receiving message for red subscriber");
            BytesMessage byteMsg = (BytesMessage)msg;
            msg = subRed.receive(10);
            while(msg != null){
                  System.err.println("     RED recive message "+ ((BytesMessage)msg).readUTF());
                  msg = subRed.receive(10); // receive with timeout;
            }
               System.err.println("End receiving message for red subscriber");
            con.commit();
            tc_conn.close();
            con.close();
    
        }
    }
    

    5) Pom dependencies

     <dependencies>
            <dependency>
                <groupId>com.oracle</groupId>
                <artifactId>ojdbc6</artifactId>
                <version>11.2.0.4</version>
            </dependency>
            <dependency>
                <groupId>com.oracle</groupId>
                <artifactId>aqapi</artifactId>
                <version>13</version>
            </dependency>
            <dependency>
                <groupId>javax.jms</groupId>
                <artifactId>jms</artifactId>
                <version>1.1</version>
            </dependency>
            <dependency>
                <groupId>javax.transaction</groupId>
                <artifactId>jta</artifactId>
                <version>1.1</version>
            </dependency>
            <dependency>
                <groupId>com.oracle</groupId>
                <artifactId>orai18n</artifactId>
                <version>11.2.0.4</version>
            </dependency>
        </dependencies>