神谕

介绍

在使用Oracle高级队列方法时,我在Oracle SQL Server(确切地说:Oracle数据库11g企业版版本11.2.0.4.0 - 64位生产版)中遇到了一个非常奇怪的行为。

问题

错误是我排队X消息,但dequeue_array返回X + 1消息,第一条消息被复制(如MessageId所示)。

复制:

我能够编写一些简单的PoC来重现错误。 这个代码非常简单直接,入队/出队的东西是标准的Oracle AQ。 该代码执行以下步骤两次(测试运行):

  • 清除队列表
  • 入队X消息
  • 使用dbms_aq.dequeue_array调用将所有消息出列
  • 检查有多少消息已经出队
  • 当在全新连接上运行POC时,第一次运行成功,没有错误,但每次运行都失败。 之后,当使用相同的连接时,每次执行脚本时都会在两次测试运行中失败。

    我到目前为止的尝试:

  • 在“新鲜”连接上运行此脚本:只有第一次运行失败
  • 在同一连接上执行脚本的任何其他操作:所有运行都失败
  • 在使用/创建新队列时:只有第一次运行失败
  • 当使用“从<queue_table>删除”而不是dbms_aqadm.purge_queue_table()时:一切正常
  • 当使用“正常的”逐一出队时:everyhing 很好
  • 结论:

    我既不能解释这种行为,也不能在我的代码中找到错误。 请看看它,它应该可以在您最喜欢的sql客户端中直接执行(使用PL / SQL Developer进行测试)。

    如果您需要任何进一步的信息或让PoC工作时遇到问题,只需询问,我会定期检查该线程。 我试图让PoC尽可能易读,其中包括关于正在发生的事情的详细输出。

    码:

    declare
       C_QueueName        constant varchar2(32767) := 'TEST_QUEUE';
       C_QueueTable       constant varchar2(32767) := 'TEST_Q_TABLE';
       C_MsgCount         constant pls_integer := 1;
       C_TestRuns         constant pls_integer := 2;
       C_DequeueArraySize constant pls_integer := 10;
    
       /*
        * Create the queue and the queue table used for theses tests
       */
       procedure CreateQueueIfMissing is
          L_Present pls_integer;
       begin
          dbms_output.put_line('START CreateQueueIfMissing');
    
          execute immediate 'select count(*) from USER_OBJECTS where OBJECT_NAME = ''' || C_QueueName || ''' and OBJECT_TYPE = ''QUEUE''' into L_Present;
          if L_Present = 1 then
             dbms_output.put_line('Skipping queue creation, already present.');
             dbms_output.put_line('END CreateQueueIfMissing');
             return;
          end if;
    
          dbms_output.put_line('  Creating queue table ' || C_QueueTable);
          DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => C_QueueTable
             ,storage_clause     => 'LOGGING NOCACHE NOPARALLEL MONITORING'
             ,sort_list          => 'priority,enq_time'
             ,multiple_consumers => false
             ,queue_payload_type => 'SYS.AQ$_JMS_BYTES_MESSAGE'
             ,comment            => 'Queue for messages');
    
          dbms_output.put_line('  Creating queue ' || C_QueueName);
          DBMS_AQADM.CREATE_QUEUE(queue_name => C_QueueName
             ,queue_table => C_QueueTable
             ,max_retries => 8640
             ,retry_delay => 30
             ,comment => 'Queue for messages');
    
          dbms_output.put_line('  Starting queue ' || C_QueueName);
          DBMS_AQADM.START_QUEUE(queue_name => C_QueueName);
          dbms_output.put_line('END CreateQueueIfMissing');
       end CreateQueueIfMissing;
       -- ================================================================================================
    
    
       /*
        * This procedure is the root of all evil.
        * The error only occurs when using the purge_queue_tables procedure.
        * When using a normal "delete from <queue_table>" then everything is just fine.
       */
       procedure CleanQueueTable is
          L_PurgeOptions dbms_aqadm.aq$_purge_options_t;
          L_Count        pls_integer;
       begin
          dbms_output.put_line('START CleanQueueTable');
    
          execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
          dbms_output.put_line('  Messages in queue table BEFORE purge: ' || L_Count);
    
          dbms_aqadm.purge_queue_table(queue_table => C_QueueTable
             ,purge_condition => null
             ,purge_options   => L_PurgeOptions);
    
          execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
          dbms_output.put_line('  Messages in queue table AFTER purge: ' || L_Count);
    
          dbms_output.put_line('END CleanQueueTable');
       end CleanQueueTable;
       -- ================================================================================================
    
    
       /*
        * Enqueue the configured count of messages on the queue
       */
       procedure EnqueueMessages is
          L_BodyId  pls_integer;
          L_Msg     sys.aq$_jms_bytes_message;
          L_MsgId   raw(16);
          L_Count   pls_integer;
    
          L_EnqueueOptions    DBMS_AQ.ENQUEUE_OPTIONS_T;
          L_MessageProperties DBMS_AQ.MESSAGE_PROPERTIES_T;
       begin
          dbms_output.put_line('START EnqueueMessages');
    
          execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
          dbms_output.put_line('  Messages in queue table BEFORE enqueue: ' || L_Count);
    
          for i in 1 .. C_MsgCount
          loop
             dbms_output.put_line('    Construct #' || i);
             L_Msg := sys.aq$_jms_bytes_message.construct;
    
             -- set the JMS header
             L_Msg.set_type('JmsBytesMessage');
             L_Msg.set_userid(1);
             L_Msg.set_appid('test');
             L_Msg.set_groupid('cs');
             L_Msg.set_groupseq(1);
    
             -- set JMS message content
             L_BodyId := L_Msg.clear_body(-1);
             L_Msg.write_bytes(L_BodyId, to_blob(utl_raw.cast_to_raw('<test>Lorem Ipsum</test>')));
             L_Msg.flush(L_BodyId);
             L_Msg.clean(L_BodyId);
    
             dbms_output.put_line('    Enqueue #' || i);
             DBMS_AQ.ENQUEUE (queue_name => C_QueueName
                ,enqueue_options    => L_EnqueueOptions
                ,message_properties => L_MessageProperties
                ,payload            => L_Msg
                ,msgid              => L_MsgId);
          end loop;
    
          execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
          dbms_output.put_line('  Messages in queue table AFTER enqueue: ' || L_Count);
          dbms_output.put_line('END EnqueueMessages');
       end EnqueueMessages;
       -- ================================================================================================
    
    
       /*
        * Dequeues messages using dequeue_array from the configured queue.
       */
       procedure DequeueMessages is
          L_DequeueOptions dbms_aq.dequeue_options_t;
          L_MsgPropArr     dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t();
          L_PayloadArr     sys.aq$_jms_bytes_messages;
          L_MsgIdArr       dbms_aq.msgid_array_t;
    
          L_MsgCnt         pls_integer := 0;
          L_Count          pls_integer;
       begin
          dbms_output.put_line('START DequeueMessages');
    
          execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
          dbms_output.put_line('  Messages in queue table BEFORE dequeue: ' || L_Count);
    
          L_MsgCnt := dbms_aq.dequeue_array(queue_name => C_QueueName
             ,dequeue_options          => L_DequeueOptions
             ,array_size               => C_DequeueArraySize
             ,message_properties_array => L_MsgPropArr
             ,payload_array            => L_PayloadArr
             ,msgid_array              => L_MsgIdArr);
    
          execute immediate 'select count(*) from ' ||  C_QueueTable into L_Count;
          dbms_output.put_line('  Messages in queue table AFTER dequeue: ' || L_Count);
    
          dbms_output.put_line('  Expected: ' || C_MsgCount || ', Received: ' || L_MsgCnt);
          if C_MsgCount != L_MsgCnt then
             dbms_output.put_line('  *****************************************');
             dbms_output.put_line('  TOO MANY ITEMS DEQUEUED?!?');
             dbms_output.put_line('  *****************************************');
             for i in 1 .. L_MsgCnt
             loop
                dbms_output.put_line('    #' || i || ' MsdId=' || L_MsgIdArr(i));
             end loop;
          end if;
          dbms_output.put_line('END DequeueMessages');
       end DequeueMessages;
       -- ================================================================================================
    
       /*
        * This is the testcase
       */
       procedure RunTestCase is
       begin
          CreateQueueIfMissing;
    
          for i in 1 .. C_TestRuns
          loop
             dbms_output.put_line(null);
             dbms_output.put_line('=========== START test run #' || i || '===========');
             CleanQueueTable;
             EnqueueMessages;
             DequeueMessages;
          end loop;
       end;
       -- ================================================================================================
    begin
       RunTestCase;
    end;
    

    示例输出:

    START CreateQueueIfMissing
    Skipping queue creation, already present.
    END CreateQueueIfMissing
    
    =========== START test run #1===========
    START CleanQueueTable
      Messages in queue table BEFORE purge: 0
      Messages in queue table AFTER purge: 0
    END CleanQueueTable
    START EnqueueMessages
      Messages in queue table BEFORE enqueue: 0
        Construct #1
        Enqueue #1
      Messages in queue table AFTER enqueue: 1
    END EnqueueMessages
    START DequeueMessages
      Messages in queue table BEFORE dequeue: 1
      Messages in queue table AFTER dequeue: 0
      Expected: 1, Received: 1
    END DequeueMessages
    
    =========== START test run #2===========
    START CleanQueueTable
      Messages in queue table BEFORE purge: 0
      Messages in queue table AFTER purge: 0
    END CleanQueueTable
    START EnqueueMessages
      Messages in queue table BEFORE enqueue: 0
        Construct #1
        Enqueue #1
      Messages in queue table AFTER enqueue: 1
    END EnqueueMessages
    START DequeueMessages
      Messages in queue table BEFORE dequeue: 1
      Messages in queue table AFTER dequeue: 0
      Expected: 1, Received: 2
      *****************************************
      TOO MANY ITEMS DEQUEUED?!?
      *****************************************
        #1 MsdId=2949A0FF2EE456A7E0540010E0467A30
        #2 MsdId=2949A0FF2EE456A7E0540010E0467A30
    END DequeueMessages
    

    这看起来像错误20659700.文档2002148.1中有更多信息。

    您(或您的DBA)应提出服务请求以确认该情况,并查看您的平台是否有可用的修补程序。

    链接地址: http://www.djcxy.com/p/76427.html

    上一篇: oracle

    下一篇: Simulate a scenario where message goes into exception queue