rabbitmq建立簡單的生產和消費者
阿新 • • 發佈:2019-01-06
參考《rabbitmq實戰指南》
1、首先專案中引入rabbit-client jar包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.4.2</version>
</dependency>
2、建立連線rabbitmq伺服器的連線
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitConnection { private final static String HOST = "192.168.10.59"; private final static String VIRTUALHOST = "/"; private final static int PORT = 5672; private final static String USERNAME = "it"; private final static String PASSWORD = "its123"; /** * 獲取connection * */ public static Connection createConnection() { Connection conn = null; //建立connection工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setVirtualHost(VIRTUALHOST); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); try { conn = factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return conn; } }
3、建立生產者,需要先執行下,然後在啟動佇列,去繫結這個exchange:
import java.io.IOException; import java.util.Random; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * <p> * *生產訊息 * * </p> * @author hz16092620 * @date 2018年9月14日 下午3:20:57 * @version */ public class RabbitProducter { public static void main(String[] args) { createExchange(); } /** * 交換器傳送資料 * */ public static void createExchange() { Connection conn = RabbitConnection.createConnection(); Channel channel = null; try { channel = conn.createChannel(); channel.exchangeDeclare("liuhp_exchange", "direct", true);//true表示持久化的 //channel.queueBind("liuhp_quene", "liuhp_exchange", "rabbit_test_routing_key"); //傳送訊息 for (int i = 0; i < 10; i++) { channel.basicPublish("liuhp_exchange", "rabbit_test_routing_key", null, String.valueOf(new Random().nextInt(100)).getBytes()); } } catch (IOException e) { e.printStackTrace(); } finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
4、建立消費者 ,rabbitmq伺服器先有liuhp_exchange:
import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; /** * <p> * * * * </p> * @author hz16092620 * @date 2018年9月14日 下午3:22:20 * @version */ public class RabbitConsumer { public static void main(String[] args) { consumerMessage(); } /** * 消費訊息 * */ public static void consumerMessage() { Connection conn = RabbitConnection.createConnection(); try { String queneName = "liuhp_quene"; final Channel channel = conn.createChannel(); channel.queueDeclare(queneName, false, true, false, null); channel.queueBind(queneName, "liuhp_exchange", "rabbit_test_routing_key"); //消費訊息,推模式 channel.basicQos(100); channel.basicConsume(queneName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { /*System.out.println(consumerTag); System.out.println(envelope.getDeliveryTag()); System.out.println(envelope.getExchange()); System.out.println(envelope.getRoutingKey());*/ StringBuilder sb = new StringBuilder(); for (byte b : body) { sb.append((char) b); } System.out.println(sb.toString()); //channel.basicAck(envelope.getDeliveryTag(), false);//這個可以確認是否處理訊息 } }); /*try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { conn.close(); } catch (IOException e) { e.printStackTrace(); }*/ } catch (IOException e) { e.printStackTrace(); } finally { } } }
connetion和channel先不關閉,讓其有時間消費訊息 ,或者加個Thread.sleep(30000L)。