1. 程式人生 > >amqp的Java例子程式碼解析---生產者

amqp的Java例子程式碼解析---生產者

/**  * Licensed to the Apache Software Foundation (ASF) under one or more  * contributor license agreements.  See the NOTICE file distributed with  * this work for additional information regarding copyright ownership.  * The ASF licenses this file to You under the Apache License, Version 2.0  * (the "License"); you may not use this file except in compliance with  * the License.  You may obtain a copy of the License at  *  *      http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */ package example;

//匯入AMQ相關包

import org.apache.qpid.jms.*; import javax.jms.*;

//構建生產者程式碼

class Publisher {

    public static void main(String[] args) throws Exception {

        final String TOPIC_PREFIX = "topic://";

       //定義登入AMQ的使用者名稱、密碼、ip地址、埠號

        String user = env("ACTIVEMQ_USER", "admin");         String password = env("ACTIVEMQ_PASSWORD", "password");         String host = env("ACTIVEMQ_HOST", "localhost");         int port = Integer.parseInt(env("ACTIVEMQ_PORT", "5672"));

        //建立連線串

        String connectionURI = "amqp://" + host + ":" + port;         String destinationName = arg(args, 0, "topic://event");

        int messages = 10000;         int size = 256;

        //下面是沒用的程式碼

        String DATA = "abcdefghijklmnopqrstuvwxyz";         String body = "";         for (int i = 0; i < size; i++) {             body += DATA.charAt(i % DATA.length());         }

       //建立連線

        JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);

        Connection connection = factory.createConnection(user, password);         connection.start();

        //建立會話

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //建立主題或佇列,此處建立的是一個訊息佇列

        Destination destination = null;         if (destinationName.startsWith(TOPIC_PREFIX)) {             destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length()));         } else {             destination = session.createQueue(destinationName);         }

        //建立生產者

        MessageProducer producer = session.createProducer(destination);         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);       //設定生產者遞送模式

        //傳送10000條訊息,訊息內容是"#"加序號

        for (int i = 1; i <= messages; i++) {             TextMessage msg = session.createTextMessage("#:" + i);             msg.setIntProperty("id", i);             producer.send(msg);             if ((i % 1000) == 0) {                 System.out.println(String.format("Sent %d messages", i));             }         }

        //傳送完訊息的處理,關閉會話,關閉連線

        producer.send(session.createTextMessage("SHUTDOWN"));         Thread.sleep(1000 * 3);         connection.close();         System.exit(0);     }

    private static String env(String key, String defaultValue) {         String rc = System.getenv(key);         if (rc == null)             return defaultValue;         return rc;     }

    private static String arg(String[] args, int index, String defaultValue) {         if (index < args.length)             return args[index];         else             return defaultValue;     }

}