ActiveMQ的靜態網絡配置
static networkConnector是用於創建一個靜態的配置對於網絡中的多個Broker做集群,這種協議用於復合url,一個復合url包括多個url地址。
<networkConnectors> <networkConnector name="local network" duplex="true" uri="static://(tcp://192.168.174.104:61616,tcp://192.168.174.104:61676)"/> </networkConnectors>
常用networkConnector配置的可用屬性:
conduitSubscriptions :默認true,是否把同一個broker的多個consumer當做一個來處理
duplex :默認false,設置是否能雙向通信
消息發送代碼
public class JmsSend { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616"); Connection connectionView Code= connectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination queue=session.createQueue("my-queue4"); MessageProducer producer=session.createProducer(queue);for(int i=0 ; i<20 ; i++){ TextMessage message=session.createTextMessage("message"+i); //message.setStringProperty("queue", "queue"+i); //message.setJMSType("1"); producer.send(message); } session.commit(); session.close(); connection.close(); } }
192.168.174.104:61616 broker1 接收測試代碼
public class JmsReceiver1 { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://192.168.174.104:61616"); for (int i=0; i<10 ;i++){ new Myhread1(connectionFactory).start(); Thread.sleep(1000); } } } class Myhread1 extends Thread { private ConnectionFactory connectionFactory ; public Myhread1(ConnectionFactory connectionFactory) { super(); this.connectionFactory = connectionFactory; } public void run() { try { final Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("my-queue4"); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage msg = (TextMessage) message; try { System.out.println("1======"+msg.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } }View Code
192.168.174.104:61676 broker2 接收測試代碼
public class JmsReceiver2 { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://192.168.174.104:61676"); for (int i=0; i<10 ;i++){ new Myhread2(connectionFactory).start(); Thread.sleep(1000); } } } class Myhread2 extends Thread { private ConnectionFactory connectionFactory ; public Myhread2(ConnectionFactory connectionFactory) { super(); this.connectionFactory = connectionFactory; } public void run() { try { final Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("my-queue4"); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage msg = (TextMessage) message; try { System.out.println("2======"+msg.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } }View Code
“丟失”的消息
broker1和broker2通過networkConnector連接,一些consumers連接到broker2,消費broker1上的消息。消息先被broker2從broker1上消費掉,然後轉發給這些consumers。不幸的是轉發部分消息的時候broker2重啟了,這些consumers發現broker2連接失敗,通過failover連接到broker1上去了,但是有一部分他們還沒有消費的消息被broker1已經分發到了broker2上去了。這些消息,就好像是消失了。
broker1 中my-queue4 接收到20條消息。
broker1通過靜態網絡與broker2連接,與broker2相連的消費者消費後,broker1中Number of Pending Messages為0,即消息先被broker2從broker1上消費掉。
一些consumers連接到broker1,沒法從broker1獲取消息消費。
針對“丟失”的消息,配置replayWhenNoConsumers選項
這個選項使得broker1上有需要轉發的消息但是沒有消費者時,把消息回流到它原始的broker。同時把enableAudit設置為false,為了防止消息回流後被當做重復消息而不被分發。
<policyEntries> <policyEntry queue=">" enableAudit="false"> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory> </policyEntry> </policyEntries>
ActiveMQ的靜態網絡配置