(二) RabbitMQ實戰教程(面向Java開發人員)之Rabbit Java Client
阿新 • • 發佈:2018-12-25
RabbitMQ Java Client
在介紹完RabbitMQ基本概念後,我們使用JAVA程式碼來模擬一套生產者和消費者的模型,Talk is cheap 直接上程式碼了。使用Java Client整合RabbitMQ需要在pom.xml中匯入如下依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.0</version>
</dependency >
相關程式碼
1.建立連線工具類
public class ChannelUtils {
public static Channel getChannelInstance(String connectionDescription) {
try {
ConnectionFactory connectionFactory = getConnectionFactory();
Connection connection = connectionFactory.newConnection(connectionDescription);
return connection.createChannel();
} catch (Exception e) {
throw new RuntimeException("獲取Channel連線失敗");
}
}
private static ConnectionFactory getConnectionFactory() {
ConnectionFactory connectionFactory = new ConnectionFactory();
// 配置連線資訊
connectionFactory.setHost("192.168.56.128" );
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("roberto");
connectionFactory.setPassword("roberto");
// 網路異常自動連線恢復
connectionFactory.setAutomaticRecoveryEnabled(true);
// 每10秒嘗試重試連線一次
connectionFactory.setNetworkRecoveryInterval(10000);
// 設定ConnectionFactory屬性資訊
Map<String, Object> connectionFactoryPropertiesMap = new HashMap();
connectionFactoryPropertiesMap.put("principal", "RobertoHuang");
connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0");
connectionFactoryPropertiesMap.put("emailAddress", "[email protected]");
connectionFactory.setClientProperties(connectionFactoryPropertiesMap);
return connectionFactory;
}
}
2.建立訊息生產者
public class MessageProducer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = ChannelUtils.getChannelInstance("RGP訂單系統訊息生產者");
// 宣告交換機 (交換機名, 交換機型別, 是否持久化, 是否自動刪除, 是否是內部交換機, 交換機屬性);
channel.exchangeDeclare("roberto.order", BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
// 設定訊息屬性 釋出訊息 (交換機名, Routing key, 可靠訊息相關屬性 後續會介紹, 訊息屬性, 訊息體);
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build();
channel.basicPublish("roberto.order", "add", false, basicProperties, "訂單資訊".getBytes());
}
}
3.建立訊息消費者
public class MessageConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = ChannelUtils.getChannelInstance("RGP訂單系統訊息消費者");
// 宣告佇列 (佇列名, 是否持久化, 是否排他, 是否自動刪除, 佇列屬性);
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare("roberto.order.add", true, false, false, new HashMap<>());
// 宣告交換機 (交換機名, 交換機型別, 是否持久化, 是否自動刪除, 是否是內部交換機, 交換機屬性);
channel.exchangeDeclare("roberto.order", BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
// 將佇列Binding到交換機上 (佇列名, 交換機名, Routing key, 繫結屬性);
channel.queueBind(declareOk.getQueue(), "roberto.order", "add", new HashMap<>());
// 消費者訂閱訊息 監聽如上宣告的佇列 (佇列名, 是否自動應答(與訊息可靠有關 後續會介紹), 消費者標籤, 消費者)
channel.basicConsume(declareOk.getQueue(), true, "RGP訂單系統ADD處理邏輯消費者", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println(envelope.toString());
System.out.println(properties.toString());
System.out.println("訊息內容:" + new String(body));
}
});
}
}
4.依次啟動訊息消費者和訊息生產者,控制檯列印
RGP訂單系統ADD處理邏輯消費者
Envelope(deliveryTag=1, redeliver=false, exchange=roberto.order, routingKey=add)
#contentHeader<basic>(content-type=UTF-8, content-encoding=null, headers=null, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
訊息內容:訂單資訊
注意:初次執行時一定要先執行消費者後執行生產者,因為初次執行時還沒有宣告佇列和交換機的繫結關係,如果先啟動生產者會導致訊息無法被正確路由而被丟棄
控制檯配置
執行上訴程式碼後,開啟RabbitMQ管理控制檯介面看到配置如下
連線屬性配置:
交換機屬性配置:
佇列屬性配置:
將管理控制檯屬性與上訴程式碼結合起來會更有利於讀者理解每行程式碼的實際意義