1. 程式人生 > >ActiveMQ的靜態網絡配置

ActiveMQ的靜態網絡配置

代碼 -a bubuko sub 消費 sum lock pub 失敗

static networkConnector是用於創建一個靜態的配置對於網絡中的多個Broker做集群,這種協議用於復合url,一個復合url包括多個url地址。

<networkConnectors>
             <networkConnector name="local network"  duplex="true"
              uri="static://(tcp://192.168.174.104:61616,tcp://192.168.174.104:61676)"/>
</networkConnectors>

常用networkConnector配置的可用屬性:

  conduitSubscriptions :默認true,是否把同一個broker的多個consumer當做一個來處理

  duplex :默認false,設置是否能雙向通信


消息發送代碼
技術分享圖片
public class JmsSend {
    
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
        Connection connection 
= connectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination queue=session.createQueue("my-queue4"); MessageProducer producer=session.createProducer(queue);
for(int i=0 ; i<20 ; i++){ TextMessage message=session.createTextMessage("message"+i); //message.setStringProperty("queue", "queue"+i); //message.setJMSType("1"); producer.send(message); } session.commit(); session.close(); connection.close(); } }
View Code

192.168.174.104:61616 broker1 接收測試代碼
技術分享圖片
public class JmsReceiver1 {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.174.104:61616");
        

        for (int i=0; i<10 ;i++){
            new Myhread1(connectionFactory).start();
            
            Thread.sleep(1000);
            
        }
        
        

    }

}

class Myhread1 extends Thread {
    
    private ConnectionFactory connectionFactory ;
    
    public Myhread1(ConnectionFactory connectionFactory) {
        super();
        this.connectionFactory = connectionFactory;
    }


    public void run() {
         
         
        try {
            
            final Connection connection = connectionFactory.createConnection();
            connection.start();
            
            
            final Session session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            
            Destination queue = session.createQueue("my-queue4");

            MessageConsumer consumer = session.createConsumer(queue);

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage msg = (TextMessage) message;
                    try {
                        System.out.println("1======"+msg.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    try {
                        session.commit();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    try {
                        session.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    
                }
            });
            
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }

}
View Code

192.168.174.104:61676 broker2 接收測試代碼
技術分享圖片
public class JmsReceiver2 {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.174.104:61676");
        

        for (int i=0; i<10 ;i++){
            new Myhread2(connectionFactory).start();
            
            Thread.sleep(1000);
            
        }
        
        

    }

}

class Myhread2 extends Thread {
    
    private ConnectionFactory connectionFactory ;
    
    public Myhread2(ConnectionFactory connectionFactory) {
        super();
        this.connectionFactory = connectionFactory;
    }


    public void run() {
         
        
        try {
            
            final Connection connection  = connectionFactory.createConnection();
            connection.start();
            
            
            final Session session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            
            Destination queue = session.createQueue("my-queue4");

            MessageConsumer consumer = session.createConsumer(queue);

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage msg = (TextMessage) message;
                    try {
                        System.out.println("2======"+msg.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    try {
                        session.commit();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    try {
                        session.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                }
            });
            
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }

}
View Code

“丟失”的消息

broker1和broker2通過networkConnector連接,一些consumers連接到broker2,消費broker1上的消息。消息先被broker2從broker1上消費掉,然後轉發給這些consumers。不幸的是轉發部分消息的時候broker2重啟了,這些consumers發現broker2連接失敗,通過failover連接到broker1上去了,但是有一部分他們還沒有消費的消息被broker1已經分發到了broker2上去了。這些消息,就好像是消失了。

broker1 中my-queue4 接收到20條消息。

技術分享圖片

broker1通過靜態網絡與broker2連接,與broker2相連的消費者消費後,broker1中Number of Pending Messages為0,即消息先被broker2從broker1上消費掉。

技術分享圖片

一些consumers連接到broker1,沒法從broker1獲取消息消費。

技術分享圖片

針對“丟失”的消息,配置replayWhenNoConsumers選項

這個選項使得broker1上有需要轉發的消息但是沒有消費者時,把消息回流到它原始的broker。同時把enableAudit設置為false,為了防止消息回流後被當做重復消息而不被分發。

<policyEntries>
        <policyEntry queue=">" enableAudit="false">
                <networkBridgeFilterFactory>
                        <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                </networkBridgeFilterFactory>
        </policyEntry>
</policyEntries>

ActiveMQ的靜態網絡配置