1. 程式人生 > >ActiveMQ中資料表SQL的管理

ActiveMQ中資料表SQL的管理

  在ActiveMQ為了方便的切換資料庫,更為了深入瞭解ActiveMQ中SQL語句的詳細的資訊,可以通過Statements獲取各種SQL語句。在ActivMQ第一次載入的時候,通過Statements生產響應的Spring Bean,載入到記憶體中。在ActiveMQ的管理和監控的時候,從內容獲取相關的SQL語句,簡化了ActiveMQ的訊息的管理和監控。

Java程式碼  收藏程式碼
  1. package com.easyway.activemq.proxy;  
  2. import java.lang.reflect.InvocationTargetException;  
  3. import java.lang.reflect.Method;  
  4. import java.util.ArrayList;  
  5. import java.util.regex.Pattern;  
  6. import org.apache.activemq.store.jdbc.Statements;  
  7. /** 
  8.  * 獲取ActiveMQ中SQL相關的各種語句 
  9.  * @author longgangbai 
  10.  * 
  11.  */  
  12. public class ActiveMQSQLManager {  
  13.      /** 
  14.       *  
  15.       * @param statement 
  16.       * @return 
  17.       */  
  18.      public static String returnStatement(Object statement){  
  19.             return ((String)statement).replace("<""&lt;").replace(">""&gt;");  
  20.         }  
  21.         /** 
  22.          *  
  23.          * @param args 
  24.          */  
  25.         public static void main(String[] args) throws Exception{  
  26.             createSQLForActiveMQ();  
  27.         }  
  28.         /** 
  29.          * 建立ActiveMQ中語句
     
  30.          * @throws IllegalAccessException 
  31.          * @throws InvocationTargetException 
  32.          */  
  33.         private static void createSQLForActiveMQ() throws IllegalAccessException, InvocationTargetException {  
  34.             Statements s=new Statements();  
  35.             //設定表的字首  
  36.             s.setTablePrefix("ACTIVEMQ.");  
  37.             //獲取Statements中建立的語句的集合  
  38.             String[] stats=s.getCreateSchemaStatements();  
  39.             //構建Statements的Bean  
  40.             System.out.println("<bean id=\"statements\" class=\"org.apache.activemq.store.jdbc.Statements\">");  
  41.             createSQLForTableIndex(stats);  
  42.             createSQLForActiveMQBussine(s);  
  43.             //end of generating because of typo  
  44.             createSQLOfDropTable(s);  
  45.             System.out.println("</bean>");  
  46.         }  
  47.         /** 
  48.          * 建立ActiveMQ監控和管理使用的SQL語句 
  49.          * @param s 
  50.          * @throws IllegalAccessException 
  51.          * @throws InvocationTargetException 
  52.          */  
  53.         private static void createSQLForActiveMQBussine(Statements s) throws IllegalAccessException, InvocationTargetException {  
  54.             //獲取Statements的方法  
  55.             Method[] methods=Statements.class.getMethods();  
  56.             //  
  57.             Pattern sPattern= Pattern.compile("get.*Statement$");  
  58.             Pattern setPattern= Pattern.compile("set.*Statement$");  
  59.             //獲取儲存Set方法  
  60.             ArrayList<String> setMethods=new ArrayList<String>();  
  61.             for(int i=0; i<methods.length;i++){  
  62.                 if(setPattern.matcher(methods[i].getName()).find()){  
  63.                     setMethods.add(methods[i].getName());  
  64.                 }  
  65.             }  
  66.             //  
  67.             for(int i=0; i<methods.length;i++){  
  68.                 if(sPattern.matcher(methods[i].getName()).find()&&setMethods.contains(methods[i].getName().replace("get","set"))){  
  69.                     System.out.println("<property name=\""+methods[i].getName().substring(3,4).toLowerCase()+methods[i].getName().substring(4)+"\" value=\""+returnStatement(methods[i].invoke(s, (Object[])null))+"\" />");  
  70.                 }  
  71.             }  
  72.             //for a typo is not needed if removeMessageStatment typo is corrected  
  73.             //獲取get的方法  
  74.             Pattern sPattern2= Pattern.compile("get.*Statment$");  
  75.             for(int i=0; i<methods.length;i++){  
  76.                 if(sPattern2.matcher(methods[i].getName()).find()){  
  77.                     System.out.println("<property name=\""+methods[i].getName().substring(3,4).toLowerCase()+methods[i].getName().substring(4)+"\" value=\""+returnStatement(methods[i].invoke(s, (Object[])null))+"\" />");  
  78.                 }  
  79.             }  
  80.         }  
  81.         /** 
  82.          * 建立表的建立語句和索引語句 
  83.          * @param stats 
  84.          */  
  85.         private static void createSQLForTableIndex(String[] stats) {  
  86.             //獲取建立表和索引的時候的Schema語句  
  87.             System.out.println("<property name=\"createSchemaStatements\">");  
  88.             System.out.println("<list>");  
  89.             for(int i=0; i<stats.length;i++){  
  90.                 System.out.println("<value>"+stats[i]+"</value>");  
  91.             }  
  92.             System.out.println("</list>");  
  93.             System.out.println("</property>");  
  94.         }  
  95.         /** 
  96.          * 獲取刪除語句 
  97.          * @param s 
  98.          */  
  99.         private static void createSQLOfDropTable(Statements s) {  
  100.             //構建刪除語句  
  101.             String[] statsDrop=s.getDropSchemaStatements();  
  102.             System.out.println("<property name=\"dropSchemaStatements\">");  
  103.             System.out.println("<list>");  
  104.             for(int i=0; i<statsDrop.length;i++){  
  105.                 System.out.println("<value>"+statsDrop[i]+"</value>");  
  106.             }  
  107.             System.out.println("</list>");  
  108.             System.out.println("</property>");  
  109.         }  
  110. }  

生成的spring bean如下:

Java程式碼  收藏程式碼
  1. <bean id="statements" class="org.apache.activemq.store.jdbc.Statements">  
  2. <property name="createSchemaStatements">  
  3. <list>  
  4. <value>CREATE TABLE ACTIVEMQ.ACTIVEMQ_MSGS(ID BIGINT NOT NULL, CONTAINER VARCHAR(250), MSGID_PROD VARCHAR(250), MSGID_SEQ BIGINT, EXPIRATION BIGINT, MSG BLOB, PRIMARY KEY ( ID ) )</value>  
  5. <value>CREATE INDEX ACTIVEMQ.ACTIVEMQ_MSGS_MIDX ON ACTIVEMQ.ACTIVEMQ_MSGS (MSGID_PROD,MSGID_SEQ)</value>  
  6. <value>CREATE INDEX ACTIVEMQ.ACTIVEMQ_MSGS_CIDX ON ACTIVEMQ.ACTIVEMQ_MSGS (CONTAINER)</value>  
  7. <value>CREATE INDEX ACTIVEMQ.ACTIVEMQ_MSGS_EIDX ON ACTIVEMQ.ACTIVEMQ_MSGS (EXPIRATION)</value>  
  8. <value>CREATE TABLE ACTIVEMQ.ACTIVEMQ_ACKS(CONTAINER VARCHAR(250) NOT NULL, SUB_DEST VARCHAR(250), CLIENT_ID VARCHAR(250) NOT NULL, SUB_NAME VARCHAR(250) NOT NULL, SELECTOR VARCHAR(250), LAST_ACKED_ID BIGINT, PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))</value>  
  9. <value>CREATE TABLE ACTIVEMQ.ACTIVEMQ_LOCK( ID BIGINT NOT NULL, TIME BIGINT, BROKER_NAME VARCHAR(250), PRIMARY KEY (ID) )</value>  
  10. <value>INSERT INTO ACTIVEMQ.ACTIVEMQ_LOCK(ID) VALUES (1)</value>  
  11. <value>ALTER TABLE ACTIVEMQ.ACTIVEMQ_MSGS ADD PRIORITY BIGINT</value>  
  12. <value>CREATE INDEX ACTIVEMQ.ACTIVEMQ_MSGS_PIDX ON ACTIVEMQ.ACTIVEMQ_MSGS (PRIORITY)</value>  
  13. <value>ALTER TABLE ACTIVEMQ.ACTIVEMQ_ACKS ADD PRIORITY BIGINT DEFAULT 5 NOT NULL</value>  
  14. <value>ALTER TABLE ACTIVEMQ.ACTIVEMQ_ACKS DROP PRIMARY KEY</value>  
  15. <value>ALTER TABLE ACTIVEMQ.ACTIVEMQ_ACKS ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)</value>  
  16. </list>  
  17. </property>  
  18. <property name="addMessageStatement" value="INSERT INTO ACTIVEMQ.ACTIVEMQ_MSGS(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, ?)" />  
  19. <property name="updateMessageStatement" value="UPDATE ACTIVEMQ.ACTIVEMQ_MSGS SET MSG=? WHERE ID=?" />  
  20. <property name="findMessageSequenceIdStatement" value="SELECT ID, PRIORITY FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?" />  
  21. <property name="findMessageStatement" value="SELECT MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE MSGID_PROD=? AND MSGID_SEQ=?" />  
  22. <property name="findMessageByIdStatement" value="SELECT MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE ID=?" />  
  23. <property name="findAllMessagesStatement" value="SELECT ID, MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE CONTAINER=? ORDER BY ID" />  
  24. <property name="findLastSequenceIdInMsgsStatement" value="SELECT MAX(ID) FROM ACTIVEMQ.ACTIVEMQ_MSGS" />  
  25. <property name="lastProducerSequenceIdStatement" value="SELECT MAX(MSGID_SEQ) FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE MSGID_PROD=?" />  
  26. <property name="findLastSequenceIdInAcksStatement" value="SELECT MAX(LAST_ACKED_ID) FROM ACTIVEMQ.ACTIVEMQ_ACKS" />  
  27. <property name="createDurableSubStatement" value="INSERT INTO ACTIVEMQ.ACTIVEMQ_ACKS(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST, PRIORITY) VALUES (?, ?, ?, ?, ?, ?, ?)" />  
  28. <property name="findDurableSubStatement" value="SELECT SELECTOR, SUB_DEST FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" />  
  29. <property name="findAllDurableSubsStatement" value="SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=? AND PRIORITY=0" />  
  30. <property name="updateLastPriorityAckRowOfDurableSubStatement" value="UPDATE ACTIVEMQ.ACTIVEMQ_ACKS SET LAST_ACKED_ID=? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?" />  
  31. <property name="deleteSubscriptionStatement" value="DELETE FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" />  
  32. <property name="findAllDurableSubMessagesStatement" value="SELECT M.ID, M.MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS M, ACTIVEMQ.ACTIVEMQ_ACKS D  WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND M.CONTAINER=D.CONTAINER AND M.ID &gt; D.LAST_ACKED_ID ORDER BY M.PRIORITY DESC, M.ID" />  
  33. <property name="findDurableSubMessagesStatement" value="SELECT M.ID, M.MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS M, ACTIVEMQ.ACTIVEMQ_ACKS D  WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND M.CONTAINER=D.CONTAINER AND M.ID &gt; D.LAST_ACKED_ID AND M.ID &gt; ? ORDER BY M.ID" />  
  34. <property name="nextDurableSubscriberMessageStatement" value="SELECT M.ID, M.MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS M, ACTIVEMQ.ACTIVEMQ_ACKS D  WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND M.CONTAINER=D.CONTAINER AND M.ID &gt; ? ORDER BY M.ID " />  
  35. <property name="durableSubscriberMessageCountStatement" value="SELECT COUNT(*) FROM ACTIVEMQ.ACTIVEMQ_MSGS M, ACTIVEMQ.ACTIVEMQ_ACKS D  WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=? AND M.CONTAINER=D.CONTAINER      AND M.ID &gt;          ( SELECT LAST_ACKED_ID FROM ACTIVEMQ.ACTIVEMQ_ACKS           WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID           AND SUB_NAME=D.SUB_NAME )" />  
  36. <property name="findAllDestinationsStatement" value="SELECT DISTINCT CONTAINER FROM ACTIVEMQ.ACTIVEMQ_ACKS" />  
  37. <property name="removeAllMessagesStatement" value="DELETE FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE CONTAINER=?" />  
  38. <property name="removeAllSubscriptionsStatement" value="DELETE FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=?" />  
  39. <property name="deleteOldMessagesStatement" value="DELETE FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE ( EXPIRATION&lt;&gt;0 AND EXPIRATION&lt;?) OR (ID &lt;=      ( SELECT min(ACTIVEMQ.ACTIVEMQ_ACKS.LAST_ACKED_ID)       FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE ACTIVEMQ.ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ.ACTIVEMQ_MSGS.CONTAINER )   )" />  
  40. <property name="lockCreateStatement" value="SELECT * FROM ACTIVEMQ.ACTIVEMQ_LOCK FOR UPDATE" />  
  41. <property name="lockUpdateStatement" value="UPDATE ACTIVEMQ.ACTIVEMQ_LOCK SET TIME = ? WHERE ID = 1" />  
  42. <property name="destinationMessageCountStatement" value="SELECT COUNT(*) FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE CONTAINER=?" />  
  43. <property name="findNextMessagesStatement" value="SELECT ID, MSG FROM ACTIVEMQ.ACTIVEMQ_MSGS WHERE CONTAINER=? AND ID &gt; ? ORDER BY ID" />  
  44. <property name="lastAckedDurableSubscriberMessageStatement" value="SELECT MAX(LAST_ACKED_ID) FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" />  
  45. <property name="selectDurablePriorityAckStatement" value="SELECT LAST_ACKED_ID FROM ACTIVEMQ.ACTIVEMQ_ACKS WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY = ?" />  
  46. <property name="insertDurablePriorityAckStatement" value="INSERT INTO ACTIVEMQ.ACTIVEMQ_ACKS(CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY) VALUES (?, ?, ?, ?)" />  
  47. <property name="updateDurableLastAckStatement" value="UPDATE ACTIVEMQ.ACTIVEMQ_ACKS SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?" />  
  48. <property name="dropSchemaStatements">  
  49. <list>  
  50. <value>DROP TABLE ACTIVEMQ.ACTIVEMQ_ACKS</value>  
  51. <value>DROP TABLE ACTIVEMQ.ACTIVEMQ_MSGS</value>  
  52. <value>DROP TABLE ACTIVEMQ.ACTIVEMQ_LOCK</value>  
  53. </list>  
  54. </property>  
  55. </bean>