Skip Headers
Oracle® Streams Advanced Queuing User's Guide and Reference
10g Release 2 (10.2)

Part Number B14257-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Contact Us

Go to previous page
Previous
Go to next page
Next
PDF · Mobi · ePub

10 Oracle Streams AQ Operations Using PL/SQL

This chapter describes the Oracle Streams Advanced Queuing (AQ) PL/SQL operational interface.

This chapter contains these topics:

See Also:

10.1 Using Secure Queues

For secure queues, you must specify the sender_id in the messages_properties parameter. See "MESSAGE_PROPERTIES_T Type" in PL/SQL Packages and Types Reference for more information about sender_id.

When you use secure queues, the following are required:

10.2 Enqueuing Messages

DBMS_AQ.ENQUEUE(
   queue_name          IN      VARCHAR2,
   enqueue_options     IN      enqueue_options_t,
   message_properties  IN      message_properties_t,
   payload             IN      "type_name",
   msgid               OUT     RAW);

This procedure adds a message to the specified queue.

It is not possible to update the message payload after a message has been enqueued. If you want to change the message payload, then you must dequeue the message and enqueue a new message.

To store a payload of type RAW, Oracle Streams AQ creates a queue table with LOB column as the payload repository. The maximum size of the payload is determined by which programmatic interface you use to access Oracle Streams AQ. For PL/SQL, Java and precompilers the limit is 32K; for the OCI the limit is 4G.

If a message is enqueued to a multiconsumer queue with no recipient and the queue has no subscribers (or rule-based subscribers that match this message), then Oracle error ORA 24033 is raised. This is a warning that the message will be discarded because there are no recipients or subscribers to whom it can be delivered.

If several messages are enqueued in the same second, then they all have the same enq_time. In this case the order in which messages are dequeued depends on step_no, a variable that is monotonically increasing for each message that has the same enq_time. There is no situation when both enq_time and step_no are the same for two messages enqueued in the same session.

Enqueue Options

The enqueue_options parameter specifies the options available for the enqueue operation. It has the following attributes:

Message Properties

The message_properties parameter contains the information that Oracle Streams AQ uses to manage individual messages. It has the following attributes:

The examples in this chapter use the same users, message types, queue tables, and queues as do the examples in Chapter 8, "Oracle Streams AQ Administrative Interface". If you have not already created these structures in your test environment, then you must run the following examples:

For Example 8-1, you must connect as a user with administrative privileges. For the other examples in the preceding list, you can connect as user test_adm. After you have created the queues, you must start them as shown in "Starting a Queue". Except as noted otherwise, you can connect as ordinary queue user 'test' to run all examples appearing in this chapter.

Example 10-1 Enqueuing a Message, Specifying Queue Name and Payload

DECLARE
   enqueue_options     DBMS_AQ.enqueue_options_t;
   message_properties  DBMS_AQ.message_properties_t;
   message_handle      RAW(16);
   message             test.message_typ;
BEGIN
   message := test.message_typ(001, 'TEST MESSAGE', 'First message to obj_queue');
   DBMS_AQ.ENQUEUE(
      queue_name              => 'test.obj_queue',
      enqueue_options         => enqueue_options,
      message_properties      => message_properties,
      payload                 => message,
      msgid                   => message_handle);
   COMMIT;
END;
/

Example 10-2 Enqueuing a Message, Specifying Priority

DECLARE 
   enqueue_options       DBMS_AQ.enqueue_options_t; 
   message_properties    DBMS_AQ.message_properties_t; 
   message_handle        RAW(16); 
   message               test.order_typ; 
BEGIN 
   message := test.order_typ(002, 'PRIORITY MESSAGE', 'priority 30'); 
   message_properties.priority := 30; 
   DBMS_AQ.ENQUEUE(
      queue_name              => 'test.priority_queue', 
      enqueue_options         => enqueue_options, 
      message_properties      => message_properties, 
      payload                 => message, 
      msgid                   => message_handle); 
   COMMIT; 
END;
/

Enqueuing a LOB Type Message

Example 10-3 creates procedure blobenqueue() using the test.lob_type message payload object type created in Example 8-1. On enqueue, the LOB attribute is set to EMPTY_BLOB. After the enqueue completes, but before the transaction is committed, the LOB attribute is selected from the user_data column of the test.lob_qtab queue table. The LOB data is written to the queue using the LOB interfaces (which are available through both OCI and PL/SQL). The actual enqueue operation is shown in

On dequeue, the message payload will contain the LOB locator. You can use this LOB locator after the dequeue, but before the transaction is committed, to read the LOB data. This is shown in Example 10-14.

Example 10-3 Creating an Enqueue Procedure for LOB Type Messages

CREATE OR REPLACE PROCEDURE blobenqueue(msgno IN NUMBER) AS
   enq_userdata          test.lob_typ; 
   enq_msgid             RAW(16); 
   enqueue_options       DBMS_AQ.enqueue_options_t; 
   message_properties    DBMS_AQ.message_properties_t; 
   lob_loc               BLOB; 
   buffer                RAW(4096); 
BEGIN 
   buffer       := HEXTORAW(RPAD('FF', 4096, 'FF')); 
   enq_userdata := test.lob_typ(msgno, 'Large Lob data', EMPTY_BLOB(), msgno); 
   DBMS_AQ.ENQUEUE(
      queue_name          => 'test.lob_queue',
      enqueue_options     => enqueue_options,
      message_properties  => message_properties,
      payload             => enq_userdata, 
      msgid               => enq_msgid); 
   SELECT t.user_data.data INTO lob_loc 
      FROM lob_qtab t 
      WHERE t.msgid = enq_msgid; 
   DBMS_LOB.WRITE(lob_loc, 2000, 1, buffer ); 
   COMMIT; 
END;
/ 

Example 10-4 Enqueuing a LOB Type Message

BEGIN 
   FOR i IN 1..5 LOOP 
      blobenqueue(i); 
   END LOOP; 
END;
/

Enqueuing Multiple Messages to a Single-Consumer Queue

Example 10-5 enqueues six messages to test.obj_queue. These messages are dequeued in Example 10-17.

Example 10-5 Enqueuing Multiple Messages

SET SERVEROUTPUT ON
DECLARE
   enqueue_options     DBMS_AQ.enqueue_options_t;
   message_properties  DBMS_AQ.message_properties_t;
   message_handle      RAW(16);
   message             test.message_typ;
BEGIN
   message := test.message_typ(001, 'ORANGE', 'ORANGE enqueued first.');
   DBMS_AQ.ENQUEUE(
         queue_name           => 'test.obj_queue', 
         enqueue_options      => enqueue_options,
         message_properties   => message_properties,
         payload              => message,
         msgid                => message_handle);
   message := test.message_typ(001, 'ORANGE', 'ORANGE also enqueued second.');
   DBMS_AQ.ENQUEUE(
         queue_name           => 'test.obj_queue', 
         enqueue_options      => enqueue_options,
         message_properties   => message_properties,
         payload              => message,
         msgid                => message_handle);
   message := test.message_typ(001, 'YELLOW', 'YELLOW enqueued third.');
   DBMS_AQ.ENQUEUE(
         queue_name           => 'test.obj_queue', 
         enqueue_options      => enqueue_options,
         message_properties   => message_properties,
         payload              => message,
         msgid                => message_handle);
   message := test.message_typ(001, 'VIOLET', 'VIOLET enqueued fourth.');
   DBMS_AQ.ENQUEUE(
         queue_name           => 'test.obj_queue', 
         enqueue_options      => enqueue_options,
         message_properties   => message_properties,
         payload              => message,
         msgid                => message_handle);
   message := test.message_typ(001, 'PURPLE', 'PURPLE enqueued fifth.');
   DBMS_AQ.ENQUEUE(
         queue_name           => 'test.obj_queue', 
         enqueue_options      => enqueue_options,
         message_properties   => message_properties,
         payload              => message,
         msgid                => message_handle);
   message := test.message_typ(001, 'PINK', 'PINK enqueued sixth.');
   DBMS_AQ.ENQUEUE(
         queue_name           => 'test.obj_queue', 
         enqueue_options      => enqueue_options,
         message_properties   => message_properties,
         payload              => message,
         msgid                => message_handle);
   COMMIT;
END;
/

Enqueuing Multiple Messages to a Multiconsumer Queue

Example 10-6 requires that you connect as user 'test_adm' to add subscribers RED and GREEN to queue test.multiconsumer_queue. The subscribers are required for Example 10-7.

Example 10-6 Adding Subscribers RED and GREEN

DECLARE
   subscriber         sys.aq$_agent;
BEGIN
   subscriber     :=  sys.aq$_agent('RED', NULL, NULL);
   DBMS_AQADM.ADD_SUBSCRIBER(
      queue_name  =>  'test.multiconsumer_queue',
      subscriber  =>  subscriber);

   subscriber     :=  sys.aq$_agent('GREEN', NULL, NULL);
   DBMS_AQADM.ADD_SUBSCRIBER(
      queue_name  =>  'test.multiconsumer_queue',
      subscriber  =>  subscriber); 
END;
/

Example 10-7 enqueues multiple messages from sender 001. MESSAGE 1 is intended for all queue subscribers. MESSAGE 2 is intended for RED and BLUE. These messages are dequeued in Example 10-17.

Example 10-7 Enqueuing Multiple Messages to a Multiconsumer Queue

DECLARE
   enqueue_options     DBMS_AQ.enqueue_options_t;
   message_properties  DBMS_AQ.message_properties_t;
   recipients          DBMS_AQ.aq$_recipient_list_t;
   message_handle      RAW(16);
   message             test.message_typ;
BEGIN
   message := test.message_typ(001, 'MESSAGE 1','For queue subscribers');
   DBMS_AQ.ENQUEUE(
      queue_name          => 'test.multiconsumer_queue',
      enqueue_options     => enqueue_options,
      message_properties  => message_properties,
      payload             => message,
      msgid               => message_handle);

   message := test.message_typ(001, 'MESSAGE 2', 'For two recipients');
   recipients(1) := sys.aq$_agent('RED', NULL, NULL);
   recipients(2) := sys.aq$_agent('BLUE', NULL, NULL);
   message_properties.recipient_list := recipients;
   DBMS_AQ.ENQUEUE(
      queue_name          => 'test.multiconsumer_queue',
      enqueue_options     => enqueue_options,
      message_properties  => message_properties,
      payload             => message,
      msgid               => message_handle);
   COMMIT;
END;
/

Enqueuing Grouped Messages

Example 10-8 enqueues three groups of messages, with three messages in each group. These messages are dequeued in Example 10-16.

Example 10-8 Enqueuing Grouped Messages

DECLARE
   enqueue_options     DBMS_AQ.enqueue_options_t;
   message_properties  DBMS_AQ.message_properties_t;
   message_handle      RAW(16);
   message             test.message_typ;
BEGIN
  FOR groupno in 1..3 LOOP
    FOR msgno in 1..3 LOOP
      message := test.message_typ(
               001,
               'GROUP ' || groupno, 
               'Message ' || msgno || ' in group ' || groupno);
      DBMS_AQ.ENQUEUE(
         queue_name             => 'test.group_queue',
         enqueue_options        => enqueue_options,
         message_properties     => message_properties,
         payload                => message,
         msgid                  => message_handle);
    END LOOP;
    COMMIT; 
  END LOOP;
END;
/

Enqueuing a Message with Delay and Expiration

In Example 10-9, an application wants a message to be dequeued no earlier than a week from now, but no later than three weeks from now. Because expiration is calculated from the earliest dequeue time, this requires setting the expiration time for two weeks.

Example 10-9 Enqueuing a Message, Specifying Delay and Expiration

DECLARE
   enqueue_options     DBMS_AQ.enqueue_options_t;
   message_properties  DBMS_AQ.message_properties_t;
   message_handle      RAW(16);
   message             test.message_typ;
BEGIN
   message := test.message_typ(001, 'DELAYED', 'Message is delayed one week.');
   message_properties.delay      := 7*24*60*60;
   message_properties.expiration := 2*7*24*60*60;
   DBMS_AQ.ENQUEUE(
      queue_name           => 'test.obj_queue',
      enqueue_options      => enqueue_options,
      message_properties   => message_properties,
      payload              => message,
      msgid                => message_handle);
   COMMIT;
END;
/

Example 10-10 Enqueuing a Message, Specifying a Transformation

DECLARE
   enqueue_options     DBMS_AQ.enqueue_options_t;
   message_properties  DBMS_AQ.message_properties_t;
   message_handle      RAW(16);
   message             test.message_typ;
BEGIN
   message := test.message_typ(001, 'NORMAL MESSAGE', 'enqueued to obj_queue');
   enqueue_options.transformation := 'message_order_transform';
   DBMS_AQ.ENQUEUE(
      queue_name              => 'test.priority_queue', 
      enqueue_options         => enqueue_options,
      message_properties      => message_properties,
      payload                 => message,
      msgid                   => message_handle);
   COMMIT;
END;
/

See Also:

"Using Advanced Queuing Interfaces" in Oracle Objects for OLE Developer's Guide for OO4O message-enqueuing examples

10.3 Enqueuing an Array of Messages

DBMS_AQ.ENQUEUE_ARRAY(
   queue_name                IN   VARCHAR2,
   enqueue_options           IN   enqueue_options_t,
   array_size                IN   PLS_INTEGER,
   message_properties_array  IN   message_properties_array_t,
   payload_array             IN   VARRAY,
   msid_array                OUT  msgid_array_t)
RETURN PLS_INTEGER;

Use the ENQUEUE_ARRAY function to enqueue an array of payloads using a corresponding array of message properties. The output is an array of message identifiers of the enqueued messages. The function returns the number of messages successfully enqueued.

Array enqueuing is not supported for buffered messages, but you can still use DBMS_AQ.ENQUEUE_ARRAY() to enqueue buffered messages by setting array_size to 1.

The message_properties_array parameter is an array of message properties. Each element in the payload array must have a corresponding element in this record. All messages in an array have the same delivery mode.

The payload structure can be a VARRAY or nested table. The message IDs are returned into an array of RAW(16) entries of type DBMS_AQ.msgid_array_t.

As with array operations in the relational world, it is not possible to provide a single optimum array size that will be correct in all circumstances. Application developers must experiment with different array sizes to determine the optimal value for their particular applications.

Example 10-11 Enqueuing an Array of Messages

DECLARE
  enqueue_options       DBMS_AQ.enqueue_options_t;
  msg_prop_array        DBMS_AQ.message_properties_array_t;
  msg_prop              DBMS_AQ.message_properties_t;
  payload_array         test.msg_table;
  msgid_array           DBMS_AQ.msgid_array_t;
  retval                PLS_INTEGER;
BEGIN
  payload_array  := msg_table(
      message_typ(001, 'MESSAGE  1', 'array enqueued to obj_queue'), 
      message_typ(001, 'MESSAGE  2', 'array enqueued to obj_queue'));
  msg_prop_array := DBMS_AQ.message_properties_array_t(msg_prop, msg_prop);
 
  retval := DBMS_AQ.ENQUEUE_ARRAY( 
                 queue_name               => 'test.obj_queue',
                 enqueue_options          => enqueue_options,
                 array_size               => 2,
                 message_properties_array => msg_prop_array,
                 payload_array            => payload_array,
                 msgid_array              => msgid_array);
  COMMIT;
END;/

10.4 Listening to One or More Queues

DBMS_AQ.LISTEN(
   agent_list             IN    aq$_agent_list_t,
   wait                   IN    BINARY_INTEGER DEFAULT FOREVER, 
   listen_delivery_mode   IN    PLS_INTEGER DEFAULT PERSISTENT,
   agent                  OUT   sys.aq$_agent
   message_delivery_mode  OUT   PLS_INTEGER);

TYPE aq$_agent_list_t IS TABLE of aq$_agent INDEXED BY BINARY_INTEGER;

This procedure specifies which queue or queues to monitor.

This call takes a list of agents as an argument. Each agent is identified by a unique combination of name, address, and protocol.

See Also:

"AQ Agent Type"

You specify the queue to be monitored in the address field of each agent listed. Agents must have dequeue privileges on each monitored queue. You must specify the name of the agent when monitoring multiconsumer queues; but you must not specify an agent name for single-consumer queues. Only local queues are supported as addresses. Protocol is reserved for future use.

Note:

Listening to multiconsumer queues is not supported in the Java API.

The listen_delivery_mode parameter specifies what types of message interest the agent. If it is the default PERSISTENT, then the agent is informed about persistent messages only. If it is set to BUFFERED, then the agent is informed about buffered messages only. If it is set to PERSISTENT_OR_BUFFERED, then the agent is informed about both types.

This is a blocking call that returns the agent and message type when there is a message ready for consumption for an agent in the list. If there are messages for more than one agent, then only the first agent listed is returned. If there are no messages found when the wait time expires, then an error is raised.

A successful return from the listen call is only an indication that there is a message for one of the listed agents in one of the specified queues. The interested agent must still dequeue the relevant message.

Note:

You cannot call LISTEN on nonpersistent queues.

Example 10-12 Listening to a Single-Consumer Queue with Zero Timeout

SET SERVEROUTPUT ON
DECLARE
   agent            sys.aq$_agent;
   test_agent_list  DBMS_AQ.aq$_agent_list_t;
BEGIN
   test_agent_list(1) := sys.aq$_agent(NULL, 'test.obj_queue',  NULL);
   test_agent_list(2) := sys.aq$_agent(NULL, 'test.priority_queue', NULL);
   DBMS_AQ.LISTEN(
      agent_list   =>   test_agent_list, 
      wait         =>   0, 
      agent        =>   agent);
   DBMS_OUTPUT.PUT_LINE('Message in Queue: ' ||  agent.address);
END;
/

Even though both test.obj_queue and test.priority_queue contain messages (enqueued in Example 10-1 and Example 10-2 respectively) Example 10-12 returns only:

Message in Queue: "TEST"."OBJ_QUEUE"

If the order of agents in test_agent_list is reversed, so test.priority_queue appears before test.obj_queue, then the example returns:

Message in Queue: "TEST"."PRIORITY_QUEUE"

10.5 Dequeuing Messages

DBMS_AQ.DEQUEUE(
   queue_name          IN      VARCHAR2,
   dequeue_options     IN      dequeue_options_t,
   message_properties  OUT     message_properties_t,
   payload             OUT     "type_name",
   msgid               OUT     RAW);

This procedure dequeues a message from the specified queue. Beginning with Oracle Streams AQ 10g Release 2 (10.2), you can choose to dequeue only persistent messages, only buffered messages, or both. See delivery_mode in the following list of dequeue options.

Dequeue Options

The dequeue_options parameter specifies the options available for the dequeue operation. It has the following attributes:

The dequeue order is determined by the values specified at the time the queue table is created unless overridden by the message identifier and correlation identifier in dequeue options.

The database consistent read mechanism is applicable for queue operations. For example, a BROWSE call may not see a message that is enqueued after the beginning of the browsing transaction.

In a commit-time queue, a new feature of Oracle Streams AQ 10g Release 2 (10.2), messages are not visible to BROWSE or DEQUEUE calls until a deterministic order can be established among them based on an approximate CSCN.

If the navigation attribute of the dequeue_conditions parameter is NEXT_MESSAGE (the default), then subsequent dequeues retrieve messages from the queue based on the snapshot obtained in the first dequeue. A message enqueued after the first dequeue command, therefore, will be processed only after processing all remaining messages in the queue. This is not a problem if all the messages have already been enqueued or if the queue does not have priority-based ordering. But if an application must process the highest-priority message in the queue, then it must use the FIRST_MESSAGE navigation option.

Note:

It can also be more efficient to use the FIRST_MESSAGE navigation option when there are messages being concurrently enqueued. If the FIRST_MESSAGE option is not specified, then Oracle Streams AQ continually generates the snapshot as of the first dequeue command, leading to poor performance. If the FIRST_MESSAGE option is specified, then Oracle Streams AQ uses a new snapshot for every dequeue command.

Messages enqueued in the same transaction into a queue that has been enabled for message grouping form a group. If only one message is enqueued in the transaction, then this effectively forms a group of one message. There is no upper limit to the number of messages that can be grouped in a single transaction.

In queues that have not been enabled for message grouping, a dequeue in LOCKED or REMOVE mode locks only a single message. By contrast, a dequeue operation that seeks to dequeue a message that is part of a group locks the entire group. This is useful when all the messages in a group must be processed as a unit.

When all the messages in a group have been dequeued, the dequeue returns an error indicating that all messages in the group have been processed. The application can then use NEXT_TRANSACTION to start dequeuing messages from the next available group. In the event that no groups are available, the dequeue times out after the period specified in the wait attribute of dequeue_options.

Typically, you expect the consumer of messages to access messages using the dequeue interface. You can view processed messages or messages still to be processed by browsing by message ID or by using SELECT commands.

Example 10-13 returns the message enqueued in Example 10-1. It returns:

From Sender No.1
Subject: TEST MESSAGE
Text: First message to obj_queue

Example 10-13 Dequeuing Object Type Messages

SET SERVEROUTPUT ON
DECLARE
dequeue_options     DBMS_AQ.dequeue_options_t;
message_properties  DBMS_AQ.message_properties_t;
message_handle      RAW(16);
message             test.message_typ;
BEGIN
   dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
   DBMS_AQ.DEQUEUE(
      queue_name          =>     'test.obj_queue',
      dequeue_options     =>     dequeue_options,
      message_properties  =>     message_properties,
      payload             =>     message,
      msgid               =>     message_handle);
   DBMS_OUTPUT.PUT_LINE('From Sender No.'|| message.sender_id);
   DBMS_OUTPUT.PUT_LINE('Subject: '||message.subject);
   DBMS_OUTPUT.PUT_LINE('Text: '||message.text);
   COMMIT;
END;
/

Dequeuing LOB Type Messages

Example 10-14 creates procedure blobdequeue() to dequeue the LOB type messages enqueued in Example 10-4. The actual dequeue is shown in Example 10-15. It returns:

Amount of data read: 2000
Amount of data read: 2000
Amount of data read: 2000
Amount of data read: 2000
Amount of data read: 2000

Example 10-14 Creating a Dequeue Procedure for LOB Type Messages

CREATE OR REPLACE PROCEDURE blobdequeue(msgno IN NUMBER) AS
   dequeue_options     DBMS_AQ.dequeue_options_t; 
   message_properties  DBMS_AQ.message_properties_t; 
   msgid               RAW(16); 
   payload             test.lob_typ; 
   lob_loc             BLOB; 
   amount              BINARY_INTEGER; 
   buffer              RAW(4096); 
BEGIN 
   DBMS_AQ.DEQUEUE(
      queue_name          =>  'test.lob_queue',
      dequeue_options     =>   dequeue_options, 
      message_properties  =>   message_properties, 
      payload             =>   payload,
      msgid               =>   msgid); 
   lob_loc                :=   payload.data;
   amount                 :=   2000; 
   DBMS_LOB.READ(lob_loc, amount, 1, buffer);
   DBMS_OUTPUT.PUT_LINE('Amount of data read: '|| amount); 
   COMMIT;
END;
/

Example 10-15 Dequeuing LOB Type Messages

BEGIN 
   FOR i IN 1..5 LOOP 
     blobdequeue(i); 
   END LOOP; 
END;
/

Dequeuing Grouped Messages

You can dequeue the grouped messages enqueued in Example 10-8 by running Example 10-16. It returns:

GROUP 1: Message 1 in group 1
GROUP 1: Message 2 in group 1
GROUP 1: Message 3 in group 1
Finished GROUP 1
GROUP 2: Message 1 in group 2
GROUP 2: Message 2 in group 2
GROUP 2: Message 3 in group 2
Finished GROUP 2
GROUP 3: Message 1 in group 3
GROUP 3: Message 2 in group 3
GROUP 3: Message 3 in group 3
Finished GROUP 3
No more messages

Example 10-16 Dequeuing Grouped Messages

SET SERVEROUTPUT ON
DECLARE
   dequeue_options       DBMS_AQ.dequeue_options_t;
   message_properties    DBMS_AQ.message_properties_t;
   message_handle        RAW(16);
   message               test.message_typ;
   no_messages           exception;
   end_of_group          exception;
   PRAGMA EXCEPTION_INIT (no_messages, -25228);
   PRAGMA EXCEPTION_INIT (end_of_group, -25235);
BEGIN
   dequeue_options.wait       := DBMS_AQ.NO_WAIT;
   dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
   LOOP
     BEGIN
     DBMS_AQ.DEQUEUE(
        queue_name         => 'test.group_queue',
        dequeue_options    => dequeue_options,
        message_properties => message_properties,
        payload            => message,
        msgid              => message_handle);
     DBMS_OUTPUT.PUT_LINE(message.subject || ': ' || message.text );
     dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE;
     EXCEPTION
       WHEN end_of_group THEN
         DBMS_OUTPUT.PUT_LINE ('Finished ' || message.subject);
         COMMIT;
         dequeue_options.navigation := DBMS_AQ.NEXT_TRANSACTION;
     END;
   END LOOP;
   EXCEPTION
     WHEN no_messages THEN
       DBMS_OUTPUT.PUT_LINE ('No more messages');
END;
/

Dequeuing from a Multiconsumer Queue

You can dequeue the messages enqueued for RED in Example 10-7 by running Example 10-17. If you change RED to GREEN and then to BLUE, you can use it to dequeue their messages as well. The output of the example will be different in each case.

RED is a subscriber to the multiconsumer queue and is also a specified recipient of MESSAGE 2, so it gets both messages:

Message: MESSAGE 1 .. For queue subscribers
Message: MESSAGE 2 .. For two recipients
No more messages for RED

GREEN is only a subscriber, so it gets only those messages in the queue for which no recipients have been specified (in this case, MESSAGE 1):

Message: MESSAGE 1 .. For queue subscribers
No more messages for GREEN

BLUE, while not a subscriber to the queue, is nevertheless specified to receive MESSAGE 2.

Message: MESSAGE 2 .. For two recipients
No more messages for BLUE

Example 10-17 Dequeuing Messages for RED from a Multiconsumer Queue

SET SERVEROUTPUT ON
DECLARE
  dequeue_options       DBMS_AQ.dequeue_options_t;
  message_properties    DBMS_AQ.message_properties_t;
  message_handle        RAW(16);
  message               test.message_typ;
  no_messages           exception;
  PRAGMA EXCEPTION_INIT (no_messages, -25228);
BEGIN
  dequeue_options.wait          := DBMS_AQ.NO_WAIT;
  dequeue_options.consumer_name := 'RED';
  dequeue_options.navigation    := DBMS_AQ.FIRST_MESSAGE;
  LOOP
   BEGIN
    DBMS_AQ.DEQUEUE(
      queue_name         => 'test.multiconsumer_queue',
      dequeue_options    => dequeue_options,
      message_properties => message_properties,
      payload            => message,
      msgid              => message_handle);
    DBMS_OUTPUT.PUT_LINE('Message: '|| message.subject ||' .. '|| message.text );
    dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE;
   END;
  END LOOP;
  EXCEPTION
    WHEN no_messages THEN
    DBMS_OUTPUT.PUT_LINE ('No more messages for RED');
  COMMIT;
END;
/

Example 10-18 browses messages enqueued in Example 10-5 until it finds PINK, which it removes. The example returns:

Browsed Message Text: ORANGE enqueued first.
Browsed Message Text: ORANGE also enqueued second.
Browsed Message Text: YELLOW enqueued third.
Browsed Message Text: VIOLET enqueued fourth.
Browsed Message Text: PURPLE enqueued fifth.
Browsed Message Text: PINK enqueued sixth.
Removed Message Text: PINK enqueued sixth.

Dequeue Modes

Example 10-18 Dequeue in Browse Mode and Remove Specified Message

SET SERVEROUTPUT ON
DECLARE
   dequeue_options     DBMS_AQ.dequeue_options_t;
   message_properties  DBMS_AQ.message_properties_t;
   message_handle      RAW(16);
   message             test.message_typ;
BEGIN
   dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
   LOOP
      DBMS_AQ.DEQUEUE(
        queue_name              => 'test.obj_queue',
        dequeue_options         => dequeue_options,
        message_properties      => message_properties,
        payload                 => message,
        msgid                   => message_handle);
      DBMS_OUTPUT.PUT_LINE ('Browsed Message Text: ' || message.text);
      EXIT WHEN message.subject = 'PINK';
   END LOOP;
   dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
   dequeue_options.msgid        := message_handle;
   DBMS_AQ.DEQUEUE(
           queue_name           => 'test.obj_queue',
           dequeue_options      => dequeue_options,
           message_properties   => message_properties,
           payload              => message,
           msgid                => message_handle);
   DBMS_OUTPUT.PUT_LINE('Removed Message Text: ' || message.text);
   COMMIT;
END;
/

Example 10-19 previews in locked mode the messages enqueued in Example 10-5 until it finds PURPLE, which it removes. The example returns:

Locked Message Text: ORANGE enqueued first.
Locked Message Text: ORANGE also enqueued second.
Locked Message Text: YELLOW enqueued third.
Locked Message Text: VIOLET enqueued fourth.
Locked Message Text: PURPLE enqueued fifth.
Removed Message Text: PURPLE enqueued fifth.

Example 10-19 Dequeue in Locked Mode and Remove Specified Message

SET SERVEROUTPUT ON
DECLARE
   dequeue_options     DBMS_AQ.dequeue_options_t;
   message_properties  DBMS_AQ.message_properties_t;
   message_handle      RAW(16);
   message             test.message_typ;
BEGIN
   dequeue_options.dequeue_mode := DBMS_AQ.LOCKED;
   LOOP
      DBMS_AQ.dequeue(
        queue_name         => 'test.obj_queue',
        dequeue_options    => dequeue_options,
        message_properties => message_properties,
        payload            => message,
        msgid              => message_handle);
      DBMS_OUTPUT.PUT_LINE('Locked Message Text: ' || message.text);
      EXIT WHEN message.subject = 'PURPLE';
   END LOOP;
   dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
   dequeue_options.msgid        := message_handle;
   DBMS_AQ.DEQUEUE(
     queue_name           => 'test.obj_queue',
     dequeue_options      => dequeue_options,
     message_properties   => message_properties,
     payload              => message,
     msgid                => message_handle);
   DBMS_OUTPUT.PUT_LINE('Removed Message Text: ' || message.text);
   COMMIT;
END;
/

See Also:

"Using Advanced Queuing Interfaces" in Oracle Objects for OLE Developer's Guide for OO4O message-dequeuing examples

10.6 Dequeuing an Array of Messages

DBMS_AQ.DEQUEUE_ARRAY(
   queue_name                IN      VARCHAR2,
   dequeue_options           IN      dequeue_options_t,
   array_size                IN      PLS_INTEGER, 
   message_properties_array  OUT     message_properties_array_t,
   payload_array             OUT     VARRAY,
   msgid_array               OUT     msgid_array_t)
RETURN PLS_INTEGER;

Use the DEQUEUE_ARRAY function to dequeue an array of payloads and a corresponding array of message properties. The output is an array of payloads, message IDs, and message properties of the dequeued messages. The function returns the number of messages successfully dequeued.

Array dequeuing is not supported for buffered messages, but you can still use DBMS_AQ.DEQUEUE_ARRAY() to dequeue buffered messages by setting array_size to 1.

The payload structure can be a VARRAY or nested table. The message identifiers are returned into an array of RAW(16) entries of type DBMS_AQ.msgid_array_t. The message properties are returned into an array of type DBMS_AQ.message_properties_array_t.

As with array operations in the relational world, it is not possible to provide a single optimum array size that will be correct in all circumstances. Application developers must experiment with different array sizes to determine the optimal value for their particular applications.

All dequeue options available with DBMS_AQ.DEQUEUE are also available with DBMS_AQ.DEQUEUE_ARRAY. Beginning with Oracle Streams AQ 10g Release 2 (10.2), you can choose to dequeue only persistent messages, only buffered messages, or both. In addition, the navigation attribute of dequeue_options offers two options specific to DBMS_AQ.DEQUEUE_ARRAY.

When dequeuing messages, you might want to dequeue all the messages for a transaction group with a single call. You might also want to dequeue messages that span multiple transaction groups. You can specify either of these methods by using one of the following navigation methods:

Navigation method NEXT_MESSAGE_ONE_GROUP dequeues messages that match the search criteria from the next available transaction group into an array. Navigation method FIRST_MESSAGE_ONE_GROUP resets the position to the beginning of the queue and dequeues all the messages in a single transaction group that are available and match the search criteria.

The number of messages dequeued is determined by an array size limit. If the number of messages in the transaction group exceeds array_size, then multiple calls to DEQUEUE_ARRAY must be made to dequeue all the messages for the transaction group.

Navigation methods NEXT_MESSAGE_MULTI_GROUP and FIRST_MESSAGE_MULTI_GROUP work like their ONE_GROUP counterparts, but they are not limited to a single transaction group. Each message that is dequeued into the array has an associated set of message properties. Message property transaction_group determines which messages belong to the same transaction group.

Example 10-20 dequeues the messages enqueued in Example 10-11. It returns:

Number of messages dequeued: 2

Example 10-20 Dequeuing an Array of Messages

SET SERVEROUTPUT ON
DECLARE
  dequeue_options       DBMS_AQ.dequeue_options_t;
  msg_prop_array        DBMS_AQ.message_properties_array_t := 
                        DBMS_AQ.message_properties_array_t();
  payload_array         test.msg_table;
  msgid_array           DBMS_AQ.msgid_array_t;
  retval                PLS_INTEGER;
BEGIN
  retval := DBMS_AQ.DEQUEUE_ARRAY( 
              queue_name               => 'test.obj_queue',
              dequeue_options          => dequeue_options,
              array_size               => 2,
              message_properties_array => msg_prop_array,
              payload_array            => payload_array,
              msgid_array              => msgid_array);
  DBMS_OUTPUT.PUT_LINE('Number of messages dequeued: ' || retval);
END;/
 

10.7 Registering for Notification

DBMS_AQ.REGISTER(
   reg_list     IN SYS.AQ$_REG_INFO_LIST,
   reg_count    IN NUMBER);

This procedure registers an e-mail address, user-defined PL/SQL procedure, or HTTP URL for message notification.

Note:

In releases before Oracle Database 10g Release 2 (10.2), the Oracle Streams AQ notification feature was not supported for queues with names longer than 30 characters. This restriction no longer applies. The 24-character limit on names of user-generated queues still applies. See "Creating a Queue".

The reg_list parameter is a list of SYS.AQ$_REG_INFO objects. You can specify notification quality of service, a new feature in Oracle Streams AQ 10g Release 2 (10.2), with the qosflags attribute of SYS.AQ$_REG_INFO.

See Also:

"AQ Registration Information Type" for more information on SYS.AQ$_REG_INFO objects

The reg_count parameter specifies the number of entries in the reg_list. Each subscription requires its own reg_list entry. Interest in several subscriptions can be registered at one time.

When PL/SQL notification is received, the Oracle Streams AQ message properties descriptor that the callback is invoked with specifies the delivery_mode of the message notified as DBMS_AQ.PERSISTENT or DBMS_AQ.BUFFERED.

See Also:

"AQ Notification Descriptor Type" for more information on the message properties descriptor

If you register for e-mail notifications, then you must set the host name and port name for the SMTP server that will be used by the database to send e-mail notifications. If required, you should set the send-from e-mail address, which is set by the database as the sent from field. You need a Java-enabled database to use this feature.

If you register for HTTP notifications, then you might want to set the host name and port number for the proxy server and a list of no-proxy domains that will be used by the database to post HTTP notifications.

An internal queue called SYS.AQ_SRVNTFN_TABLE_Q stores the notifications to be processed by the job queue processes. If notification fails, then Oracle Streams AQ retries the failed notification up to MAX_RETRIES attempts.

Note:

You can change the MAX_RETRIES and RETRY_DELAY properties of SYS.AQ_SRVNTFN_TABLE_Q. The new settings are applied across all notifications.

Example 10-21 Registering for Notifications

DECLARE
  reginfo             sys.aq$_reg_info;
  reg_list            sys.aq$_reg_info_list;
BEGIN
  reginfo := sys.aq$_reg_info(
                      'test.obj_queue',
                      DBMS_AQ.NAMESPACE_ANONYMOUS,
                      'http://www.company.com:8080', 
                      HEXTORAW('FF'));
  reg_list  := sys.aq$_reg_info_list(reginfo);
  DBMS_AQ.REGISTER(
    reg_list     => reg_list, 
    reg_count    => 1);
  COMMIT;
END;
/

10.8 Unregistering for Notification

DBMS_AQ.UNREGISTER(
   reg_list     IN SYS.AQ$_REG_INFO_LIST,
   reg_count    IN NUMBER);

This procedure unregisters an e-mail address, user-defined PL/SQL procedure, or HTTP URL for message notification.

10.9 Posting for Subscriber Notification

DBMS_AQ.POST(
  post_list       IN  SYS.AQ$_POST_INFO_LIST,
  post_count      IN  NUMBER);

This procedure posts to a list of anonymous subscriptions, allowing all clients who are registered for the subscriptions to get notifications of persistent messages. This feature is not supported with buffered messages.

The count parameter specifies the number of entries in the post_list. Each posted subscription must have its own entry in the post_list. Several subscriptions can be posted to at one time.

The post_list parameter specifies the list of anonymous subscriptions to which you want to post. It has three attributes:

This call provides a best-effort guarantee. A notification goes to registered clients at most once. This call is primarily used for lightweight notification. If an application needs more rigid guarantees, then it can enqueue to a queue.

Example 10-22 Posting Object-Type Messages

DECLARE
  postinfo            sys.aq$_post_info;
  post_list           sys.aq$_post_info_list;
BEGIN
  postinfo  := sys.aq$_post_info('test.obj_queue',0,HEXTORAW('FF')); 
  post_list := sys.aq$_post_info_list(postinfo);
  DBMS_AQ.POST(
    post_list       => post_list,
    post_count      => 1);
  COMMIT;
END;
/

10.10 Adding an Agent to the LDAP Server

DBMS_AQ.BIND_AGENT(
   agent        IN SYS.AQ$_AGENT,
   certificate  IN VARCHAR2 default NULL);

This procedure creates an entry for an Oracle Streams AQ agent in the Lightweight Directory Access Protocol (LDAP) server.

The agent parameter specifies the Oracle Streams AQ Agent that is to be registered in LDAP server.

See Also:

"AQ Agent Type"

The certificate parameter specifies the location (LDAP distinguished name) of the OrganizationalPerson entry in LDAP whose digital certificate (attribute usercertificate) is to be used for this agent. For example, "cn=OE, cn=ACME, cn=com" is a distinguished name for a OrganizationalPerson OE whose certificate will be used with the specified agent. If the agent does not have a digital certificate, then this parameter is defaulted to null.

10.11 Removing an Agent from the LDAP Server

DBMS_AQ.UNBIND_AGENT(
   agent    IN SYS.AQ$_AGENT);

This procedure removes the entry for an Oracle Streams AQ agent from the LDAP server.