1. 程式人生 > >ActiveMQ 配置jdbc主從

ActiveMQ 配置jdbc主從

AD distrib cda 分享 troy lur temp ont conf

使用 jdbc 方式配置主從模式,持久化消息存放在數據庫中。

在同一時刻,只有一個 master broker,master 接受客戶端的連接,slave 不接受連接。
當 master 因為關機而下線後,其中一個 slave 會提升為 master,然後接受客戶端連接。但原來 master 的非持久消息丟失了,而持久消息保存在數據庫中。

broker xml 配置:使用 MySQL 數據源

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at
   
    http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
--> <!-- Use JDBC for message persistence For more information, see: http://activemq.apache.org/persistence.html You need to add Derby database to your classpath in order to make this example work. Download it from http://db.apache.org/derby/ and put it in the ${ACTIVEMQ_HOME}/lib/optional/ folder Optionally you can configure any other RDBM as shown below To run ActiveMQ with this configuration add xbean:conf/activemq-jdbc.xml to your command e.g. $ bin/activemq xbean:conf/activemq-jdbc.xml
--> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"
> <broker useJmx="false" brokerName="jdbcBroker" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/> </persistenceAdapter> <transportConnectors> <transportConnector name="default" uri="tcp://0.0.0.0:61616"/> </transportConnectors> </broker> <!-- Embedded Derby DataSource Sample Setup --> <!-- <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource"> <property name="databaseName" value="derbydb"/> <property name="createDatabase" value="create"/> </bean> --> <!-- Postgres DataSource Sample Setup --> <!-- <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource"> <property name="serverName" value="localhost"/> <property name="databaseName" value="activemq"/> <property name="portNumber" value="0"/> <property name="user" value="activemq"/> <property name="password" value="activemq"/> <property name="dataSourceName" value="postgres"/> <property name="initialConnections" value="1"/> <property name="maxConnections" value="10"/> </bean> --> <!-- MySql DataSource Sample Setup --> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://192.168.40.8:3306/db_zhang?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> <!-- Oracle DataSource Sample Setup --> <!-- <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/> <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/> <property name="username" value="scott"/> <property name="password" value="tiger"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> --> </beans>

客戶端配置,producer 和 consumer 是一樣的:

new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61618)");

以 MySQL 數據庫為例,啟動 broker 後,MySQL 中會創建 3 張表:
ACTIVEMQ_MSGS 存放持久消息
ACTIVEMQ_LOCK 表中只有一條記錄,broker 執行 SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE 獲取鎖,獲得鎖的 broker 是 master
ACTIVEMQ_ACKS

broker 獲取鎖的調用棧如下:

技術分享圖片

// org.apache.activemq.store.jdbc.DefaultDatabaseLocker
public void doStart() throws Exception {

    LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
    // SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE
    String sql = getStatements().getLockCreateStatement();
    LOG.debug("Locking Query is "+sql);
    
    while (true) {
        try {
            connection = dataSource.getConnection();
            connection.setAutoCommit(false);
            lockCreateStatement = connection.prepareStatement(sql);
            // 執行 SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE
            // 如果成功,則跳出循環。如果超時,則拋異常
            lockCreateStatement.execute();
            break;
        } catch (Exception e) {
            try {
                if (isStopping()) {
                    throw new Exception(
                            "Cannot start broker as being asked to shut down. " 
                                    + "Interrupted attempt to acquire lock: "
                                    + e, e);
                }
                if (exceptionHandler != null) {
                    try {
                        exceptionHandler.handle(e);
                    } catch (Throwable handlerException) {
                        LOG.error( "The exception handler "
                                + exceptionHandler.getClass().getCanonicalName()
                                + " threw this exception: "
                                + handlerException
                                + " while trying to handle this exception: "
                                + e, handlerException);
                    }

                } else {
                    LOG.debug("Lock failure: "+ e, e);
                }
            } finally {
                // Let‘s make sure the database connection is properly
                // closed when an error occurs so that we‘re not leaking
                // connections 
                if (null != connection) {
                    try {
                        connection.rollback();
                    } catch (SQLException e1) {
                        LOG.debug("Caught exception during rollback on connection: " + e1, e1);
                    }
                    try {
                        connection.close();
                    } catch (SQLException e1) {
                        LOG.debug("Caught exception while closing connection: " + e1, e1);
                    }
                    
                    connection = null;
                }
            }
        } finally {
            if (null != lockCreateStatement) {
                try {
                    lockCreateStatement.close();
                } catch (SQLException e1) {
                    LOG.debug("Caught while closing statement: " + e1, e1);
                }
                lockCreateStatement = null;
            }
        }

        LOG.info("Failed to acquire lock.  Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
        try {
            Thread.sleep(lockAcquireSleepInterval);
        } catch (InterruptedException ie) {
            LOG.warn("Master lock retry sleep interrupted", ie);
        }
    }

    LOG.info("Becoming the master on dataSource: " + dataSource);
}

broker 執行 SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE 獲取鎖,獲取成功,則成為 master,如果失敗,則睡眠一段時間後,繼續獲取鎖。

超時而拋出的異常:

技術分享圖片

ActiveMQ 配置jdbc主從