RabbitMQ基礎理解
RabbitMQ訊息中介軟體:
hello world模式:
不使用交換機(或者說使用了預設交換機),訊息生產者和訊息消費者通過Queue佇列關聯,生產者將訊息放入佇列,消費者取出進行消費。
1 package Queue; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException;9 10 public class Provider { 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 ConnectionFactory connectionFactory=new ConnectionFactory(); 14 15 connectionFactory.setHost("1.117.107.150"); 16 // connectionFactory.setConnectionTimeout(5000);17 connectionFactory.setPort(5672); 18 connectionFactory.setVirtualHost("/test"); 19 connectionFactory.setUsername("test"); 20 connectionFactory.setPassword("test"); 21 22 Connection connection = connectionFactory.newConnection(); 23 24 Channel channel = connection.createChannel();25 26 channel.queueDeclare("test",false,false,false,null); 27 28 for(int i=1;i<21;i++){ 29 channel.basicPublish("","test",null,(i+"hello").getBytes()); 30 } 31 32 33 channel.close(); 34 connection.close(); 35 } 36 }
1 package Queue; 2 3 import com.rabbitmq.client.*; 4 5 import java.io.IOException; 6 import java.util.concurrent.TimeoutException; 7 8 public class Customer1 { 9 10 public static void main(String[] args) throws IOException, TimeoutException { 11 ConnectionFactory connectionFactory=new ConnectionFactory(); 12 13 connectionFactory.setHost("1.117.107.150"); 14 // connectionFactory.setConnectionTimeout(5000); 15 connectionFactory.setPort(5672); 16 connectionFactory.setVirtualHost("/test"); 17 connectionFactory.setUsername("test"); 18 connectionFactory.setPassword("test"); 19 20 Connection connection = connectionFactory.newConnection(); 21 22 final Channel channel = connection.createChannel(); 23 24 channel.basicQos(1);//每次只接受一條訊息進行消費,否則將會把所有訊息放入佇列,如果消費過程中宕機,訊息將會丟失 25 channel.queueDeclare("test",false,false,false,null);//channel.queueDeclare(佇列名,是否持久化,是否獨佔佇列,是否自動刪除,其他引數) 26 27 channel.basicConsume("test",false,new DefaultConsumer(channel){//這裡的false決定不自動應答,採用手動應答。 28 @Override 29 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 30 System.out.println("new String = "+new String(body)); 36 channel.basicAck(envelope.getDeliveryTag(),false);//手動應答,告訴交換機,我佇列中的訊息處理完成。 37 } 38 }); 39 } 40 }
好處:結構單一,耦合度一般,容易理解。
壞處:吞吐量小,只適合小型專案。
其實就是一個簡單的生產者/消費者模式。
工作佇列模式:
不使用交換機(或者說使用了預設交換機),訊息生產者和訊息消費者通過Queue佇列關聯,生產者將訊息放入佇列,消費者取出進行消費,但與hello woeld不同的是,訊息消費者可以是多個,他們將平均消費佇列中的訊息。
1 package Queue; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Provider { 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 ConnectionFactory connectionFactory=new ConnectionFactory(); 14 15 connectionFactory.setHost("1.117.107.150"); 16 // connectionFactory.setConnectionTimeout(5000); 17 connectionFactory.setPort(5672); 18 connectionFactory.setVirtualHost("/test"); 19 connectionFactory.setUsername("test"); 20 connectionFactory.setPassword("test"); 21 22 Connection connection = connectionFactory.newConnection(); 23 24 Channel channel = connection.createChannel(); 25 26 channel.queueDeclare("test",false,false,false,null); 27 28 for(int i=1;i<21;i++){ 29 channel.basicPublish("","test",null,(i+"hello").getBytes()); 30 } 31 32 33 channel.close(); 34 connection.close(); 35 } 36 }
1 package Queue; 2 3 import com.rabbitmq.client.*; 4 5 import java.io.IOException; 6 import java.util.concurrent.TimeoutException; 7 8 public class Customer1 { 9 10 public static void main(String[] args) throws IOException, TimeoutException { 11 ConnectionFactory connectionFactory=new ConnectionFactory(); 12 13 connectionFactory.setHost("1.117.107.150"); 14 // connectionFactory.setConnectionTimeout(5000); 15 connectionFactory.setPort(5672); 16 connectionFactory.setVirtualHost("/test"); 17 connectionFactory.setUsername("test"); 18 connectionFactory.setPassword("test"); 19 20 Connection connection = connectionFactory.newConnection(); 21 22 final Channel channel = connection.createChannel(); 23 24 channel.basicQos(1); 25 channel.queueDeclare("test",false,false,false,null); 26 27 channel.basicConsume("test",false,new DefaultConsumer(channel){ 28 @Override 29 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 30 try { 31 Thread.sleep(3000); 32 } catch (InterruptedException e) { 33 e.printStackTrace(); 34 } 35 System.out.println("new String = "+new String(body)); 36 channel.basicAck(envelope.getDeliveryTag(),false); 37 } 38 }); 39 } 40 }
package Queue; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Customer2 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("1.117.107.150"); // connectionFactory.setConnectionTimeout(5000); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("test"); connectionFactory.setPassword("test"); Connection connection = connectionFactory.newConnection(); final Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare("test",false,false,false,null); channel.basicConsume("test",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//加入了睡眠時間模擬不同消費者處理訊息的速度,能者多勞,處理快的被分發的訊息多,反之就少 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
//因為2比1少睡眠1秒,所以處理的訊息比1多,手動應答後交換機就會給我一條新的訊息,忽略控制檯列印的時間,同樣6秒消費者2處理3條,消費者1處理兩條。 System.out.println("new String = "+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
好處:結構單一,耦合度一般,容易理解,吞吐量較hello world模式有所提升。
壞處:只適合訊息吞吐量一般的小型專案。
本質上還是生產者/消費者模式,且多個消費者平均消費。
Publish/Subscribe釋出訂閱模式:
使用了交換機,訊息生產者不在直接關聯Queue佇列,而是由exchange交換機負責訊息分發,訊息消費者依舊繫結Queue佇列,Queue繫結交換機,多個Queue可以繫結到同一個交換機。生產者不再關心訊息的去向,只負責將訊息放入交換機。
1 package Utils; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class RabbitMQUtils { 11 12 private static ConnectionFactory connectionFactory; 13 14 static { 15 connectionFactory = new ConnectionFactory(); 16 connectionFactory.setHost("1.117.107.150"); 17 connectionFactory.setPort(5672); 18 connectionFactory.setVirtualHost("/test"); 19 connectionFactory.setUsername("guest"); 20 connectionFactory.setPassword("guest"); 21 22 } 23 24 25 public static Connection getConnection(){ 26 try { 27 return connectionFactory.newConnection(); 28 } catch (IOException e) { 29 e.printStackTrace(); 30 } catch (TimeoutException e) { 31 e.printStackTrace(); 32 } 33 return null; 34 } 35 36 public static void close(Connection conn, Channel chnn) throws IOException, TimeoutException { 37 if (chnn!=null) chnn.close(); 38 if (conn!=null) conn.close(); 39 } 40 41 }
1 package Fanout; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import Utils.RabbitMQUtils; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Provider { 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 Connection connection = RabbitMQUtils.getConnection(); 14 15 Channel channel = connection.createChannel(); 16 17 channel.exchangeDeclare("logs", "Fanout"); 18 19 channel.basicPublish("logs","",null,"fanout message".getBytes()); 20 21 RabbitMQUtils.close(connection,channel); 22 } 23 }
package Fanout; import com.rabbitmq.client.*; import Utils.RabbitMQUtils; import java.io.IOException; public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "Fanout"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"logs",""); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
好處:生產者消費者進一步解耦,並且吞吐量有所提升。
壞處:只能進行廣播式訊息分發,不能將訊息特定的傳送給某一個佇列。
P/S又叫廣播模式,只要是訂閱了當前交換機的所有佇列,均可收到交換機發送的訊息。分發訊息時會指定一個路由key,但在此模式下,路由key會被忽略。
Routing路由--direct(直連):
使用了交換機,訊息分發時需要指定路由key,只有訂閱了該交換機的並且路由key完全匹配的Queue佇列才能接受訊息,否則訊息會被Queue佇列丟棄。
1 package Routing.Direct; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import Utils.RabbitMQUtils; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Provider { 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 Connection connection = RabbitMQUtils.getConnection(); 14 15 Channel channel = connection.createChannel(); 16 17 channel.exchangeDeclare("routing","direct"); 18 19 for(int i=1;i<22;i++){ 20 if(i % 2==0){ 21 channel.basicPublish("routing","black",null,(i+": my name is black").getBytes());//black為路由key 22 }else if(i % 3==0){ 23 channel.basicPublish("routing","green",null,(i+": my name is green").getBytes());//green為路由key 24 } 25 } 26 RabbitMQUtils.close(connection,channel); 27 28 } 29 }
package Routing.Direct; import com.rabbitmq.client.*; import Utils.RabbitMQUtils; import java.io.IOException; public class CustomerBlack { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("routing","direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"routing","black");//佇列的路由key channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
1 package Routing.Direct; 2 3 import com.rabbitmq.client.*; 4 import Utils.RabbitMQUtils; 5 6 import java.io.IOException; 7 8 public class CustomerGreen { 9 10 public static void main(String[] args) throws IOException { 11 Connection connection = RabbitMQUtils.getConnection(); 12 13 Channel channel = connection.createChannel(); 14 15 channel.exchangeDeclare("routing","direct"); 16 17 String queue = channel.queueDeclare().getQueue(); 18 19 channel.queueBind(queue,"routing","green");//佇列的路由key 20 21 channel.basicConsume(queue,true,new DefaultConsumer(channel){ 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 24 System.out.println(new String(body)); 25 } 26 }); 27 } 28 29 }
好處:可以根據路由key將訊息傳送到指定佇列,控制訊息的去向。
壞處:不夠靈活,只能路由key完全匹配的時候才能接收訊息。
Routing路由--topic(動態路由):
路由key支援 “*”與“#”兩個萬用字元,路由key由多個單片語成,單詞之間使用“.”分隔開,單詞可以使用萬用字元代替,進行動態路由。“*”代表一個單詞,“#”代表0個1個或者多個單詞。兩個“.”之間為一個單詞。例如:one.rabbitmq.two,name.rabbitmq.sex,email.rabbitmq.phone等都可以與【*.rabbitmq.*】相匹配,而face.rabbitmq.one.two則會匹配失敗。將路由規則修改為【#.rabbitmq.#】則one.rabbitmq.two,face.rabbitmq.one.two,one.two.rabbitmq.face,one.two.rabbitmq.three.face等都可以匹配成功。
1 package Routing.Topic; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import Utils.RabbitMQUtils; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Provider { 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 Connection connection = RabbitMQUtils.getConnection(); 14 15 Channel channel = connection.createChannel(); 16 17 channel.exchangeDeclare("topic","topic"); 18 19 for(int i=1;i<22;i++){ 20 if(i % 2==0){ 21 channel.basicPublish("topic","s.topic.t",null,(i+": my name is *").getBytes()); 22 }else if(i % 3==0){ 23 channel.basicPublish("topic","topic.one.two",null,(i+": my name is #").getBytes()); 24 } 25 } 26 RabbitMQUtils.close(connection,channel); 27 } 28 }
1 package Routing.Topic; 2 3 import com.rabbitmq.client.*; 4 import Utils.RabbitMQUtils; 5 6 import java.io.IOException; 7 8 public class Customer1 { 9 10 public static void main(String[] args) throws IOException { 11 Connection connection = RabbitMQUtils.getConnection(); 12 13 Channel channel = connection.createChannel(); 14 15 channel.exchangeDeclare("topic","topic"); 16 17 String queue = channel.queueDeclare().getQueue(); 18 19 channel.queueBind(queue,"topic","*.topic.*");//只能接受三個單詞且第二個單詞為topic的路由key訊息 20 21 channel.basicConsume(queue,true,new DefaultConsumer(channel){ 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 24 System.out.println(new String(body)); 25 } 26 }); 27 } 28 }
1 package Routing.Topic; 2 3 import com.rabbitmq.client.*; 4 import Utils.RabbitMQUtils; 5 6 import java.io.IOException; 7 8 public class Customer2 { 9 10 public static void main(String[] args) throws IOException { 11 Connection connection = RabbitMQUtils.getConnection(); 12 13 Channel channel = connection.createChannel(); 14 15 channel.exchangeDeclare("topic","topic"); 16 17 String queue = channel.queueDeclare().getQueue(); 18 19 channel.queueBind(queue,"topic","#.topic.#");//可以接收到所有第二個單詞為topic的訊息 20 21 channel.basicConsume(queue,true,new DefaultConsumer(channel){ 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 24 System.out.println(new String(body)); 25 } 26 }); 27 } 28 }
好處:更加靈活。