RabbitMQ客戶端引數效能測試(1)
最近我在公司上線了rabbitmq,替換了原來阿里出的rocketmq(別說我黑阿里的東西,這玩意真的都是坑),我並不想告訴你rabbitmq安裝過程是怎麼樣的,去看官網就知道,戳這裡
看看網上說rabbitmq效率多高多高,但是怎麼測試也只有15000Qps,還是用golang的客戶端來測試訪問的中間沒有任何處理邏輯,悲催的是java原生客戶端只能跑到11000Qps,(好吧,我承認我也黑了java),看似基本上這個問題也不大,但是我相信優化的空間應該還是蠻大的,所以做了一下測試,先上程式碼吧:
package com.enniu.rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; /** * Created by coffee on 15/10/28. */ public class Main { static final String exchangeName = "testblock"; static final String routingKey = "testblock"; static final String queueName = "testblock"; private static int producterConnection_size = 1; //訊息生產者連線數 private static int consumerConnection_size = 1; //消費者連線數 private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量 private static int qos = 1; //Qos設定 private static long sleep_time = 0; //模擬每條訊息的處理時間 private static boolean autoAck = true; //是否預設Ack private static Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) throws Exception { final AtomicLong count = new AtomicLong(10000000000L); ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("test"); factory.setPassword("test"); factory.setVirtualHost("/test"); factory.setHost("10.0.88.88"); factory.setPort(5672); //啟動監控程式 Thread t = new Thread(new Runnable() { @Override public void run() { long c = count.get(); while (c != 0){ try{ Thread.sleep(1000); long c1 = count.get(); logger.debug("每秒消費為:{}Qps",c-c1); c=c1; }catch (Exception e){ } } } }); t.start(); //啟動 for (int i=0;i<producterConnection_size;i++){ Connection conn1 = factory.newConnection(); Thread t1 = producter(conn1, count.get()); t1.start(); } //啟動consumer for (int i=0;i<consumerConnection_size;i++){ Connection conn1 = factory.newConnection(); Thread t2 = consumer(conn1, count); t2.start(); } } public static Thread consumer(final Connection conn, final AtomicLong count) throws Exception { return new Thread(new Runnable() { @Override public void run() { logger.debug("start consumer"); try { final CountDownLatch cdl = new CountDownLatch(1000); for(int i = 0;i<consumer_size;i++) { final Channel channel = conn.createChannel(); channel.basicQos(0, qos, false); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (count.decrementAndGet() == 0) { channel.basicCancel(consumerTag); cdl.countDown(); try { channel.close(); } catch (TimeoutException e) { e.printStackTrace(); } } try { Thread.sleep(sleep_time); } catch (InterruptedException e) { } if (!autoAck){ getChannel().basicAck(envelope.getDeliveryTag(), true); } } }; String consumerTag = channel.basicConsume(queueName,autoAck, "testConsumer" + i, consumer); logger.debug("consumerTag is {}", consumerTag); } cdl.await(); } catch (Exception e) { } } }); } public static Thread producter(final Connection conn, final long count) throws Exception { return new Thread(new Runnable() { @Override public void run() { logger.debug("start send Message"); try { Channel channel = conn.createChannel(); channel.exchangeDeclare(exchangeName, "direct", true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); BasicProperties properties = new BasicProperties.Builder().deliveryMode(2).build(); for (long i = 0; i < count; i++) { byte[] messageBodyBytes = ("{\"merchantsId\":13}").getBytes(); channel.basicPublish(exchangeName, routingKey, properties, messageBodyBytes); // logger.debug("add message {}",i); } channel.close(); conn.close(); } catch (Exception e) { e.printStackTrace(); } } }); } }
說說我的測試環境,兩臺vm安裝rabbitmq做叢集,配置是CPU:2core,mem:2GB,disk:187G sata
客戶端是我的macpro,雷電口轉千兆網絡卡接入測試網路.但是據說中間有一個交換機是百兆的(哭死),所以只能按照百兆連線來計算.
測試一:
測試內容:
啟動一個生產者,無消費者
配置:
private static int producterConnection_size = 1; //訊息生產者連線數 private static int consumerConnection_size = 0; //消費者連線數 private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量 private static int qos = 1; //Qos設定 private static long sleep_time = 0; //模擬每條訊息的處理時間 private static boolean autoAck = true; //是否預設Ack
結果:
自己看圖,基本上不用我解釋
測試二:
測試內容:
啟動三個生產者,無消費者
配置:
private static int producterConnection_size = 3; //訊息生產者連線數 private static int consumerConnection_size = 0; //消費者連線數 private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量 private static int qos = 1; //Qos設定 private static long sleep_time = 0; //模擬每條訊息的處理時間 private static boolean autoAck = true; //是否預設Ack
結果:
另外我還發現mq已經啟動了流控,看來他已經認為我傳送過快.
由此可見生產者的個數對整體的訊息publish沒有太大的影響,至少在單機的情況下是這樣,而且publish的時候會一直抖動,不會一直保持在一個範圍內,應該是流控機制引起的,導致它無法超越15k/s的速率,具體怎麼優化流控還請各位大牛指教,我暫時還沒有什麼頭緒.
測試三:
測試內容:
啟動無生產者,一個消費者,Qos為0.預設Ack,接收到訊息馬上返回,不休眠
配置:
private static int producterConnection_size = 0; //訊息生產者連線數
private static int consumerConnection_size = 1; //消費者連線數
private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量
private static int qos = 0; //Qos設定
private static long sleep_time = 0; //模擬每條訊息的處理時間
private static boolean autoAck = true; //是否預設Ack
結果:
測試四:
測試內容:
啟動無產者,一個消費者,Qos為1.預設Ack,接收到訊息馬上返回,不休眠
配置:
private static int producterConnection_size = 0; //訊息生產者連線數
private static int consumerConnection_size = 1; //消費者連線數
private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量
private static int qos = 1; //Qos設定
private static long sleep_time = 0; //模擬每條訊息的處理時間
private static boolean autoAck = true; //是否預設Ack
結果:
貌似這裡沒有什麼差別,消費能力都是在10k/s左右,看來Qos設定為1還是不設定並不影響消費能力,可以吧Qos加大看看效果.
測試五:
測試內容:
啟動無產者,一個消費者,Qos為10.預設Ack,接收到訊息馬上返回,不模擬業務處理時間
配置:
private static int producterConnection_size = 0; //訊息生產者連線數
private static int consumerConnection_size = 1; //消費者連線數
private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量
private static int qos = 10; //Qos設定
private static long sleep_time = 0; //模擬每條訊息的處理時間
private static boolean autoAck = true; //是否預設Ack
結果:
這裡Qos的增加並沒有對消費效率產生影響,其實這是說得通的,Qos本質是控制consumer處理訊息的快取大小,
==但是如果在網路比較差得情況下Qos=1和Qos=10對消費會有很大的差異==.
例如,訊息從mq傳遞到consumer需要50ms,處理只需要5ms的時候,如果Qos=1,那麼就必須等到這條訊息消費完了再分配下一條訊息,這樣一條訊息處理的整體時間是50*2+5=105ms,但是如果這時Qos=10的話,相當於當一條訊息到達之後不用等訊息處理完,可以就再分配下一條訊息,這樣基本上保證時時刻刻都有訊息都消費,不需要等待網路傳輸的時間當快取訊息達到10條的時候正好可以吧傳輸的50ms衝抵掉.
今天最最後一個測試,就是publish對consumer的影響到底多大.
測試六:
測試內容:
啟動一個產者,一個消費者,Qos為10.預設Ack,接收到訊息馬上返回,不模擬業務處理時間
配置:
private static int producterConnection_size = 1; //訊息生產者連線數
private static int consumerConnection_size = 1; //消費者連線數
private static final int consumer_size = 1;//每個消費者連線裡面開啟的consumer數量
private static int qos = 10; //Qos設定
private static long sleep_time = 0; //模擬每條訊息的處理時間
private static boolean autoAck = true; //是否預設Ack
結果:
當然已經觸發了流控的,如下圖:
流控機制可以自行google,這裡不做描述
看來consumer與publish同時工作的話還是有影響的,這種影響到底有多大,因素有哪些,就這一個測試當然不能說明,由於今天時間比較晚了,明天繼續.