com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
阿新 • • 發佈:2018-06-17
oid put endpoint 以及 any -m pivotal you log
RabbitMQ 基於Erlang 實現, 客戶端可以用Python | Java | Ruby | PHP | C# | Javascript | Go等語言來實現。這裏做個java語言的測試。
首先安裝好RabbitMQ 服務端。
maven依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
java測試代碼如下:
//定義隊列
EndPoint.java
public abstract class EndPoint{
protected Channel channel;
protected Connection connection;
protected String endPointName;
public EndPoint(String endpointName) throws IOException{
this.endPointName = endpointName;
//Create a connection factory
ConnectionFactory factory = new ConnectionFactory();
//hostname of your rabbitmq server
factory.setHost("192.168.163.33");
factory.setPort(5672);
factory.setUsername("test");
factory.setPassword("test");
//creating a channel
channel = connection.createChannel();
//declaring a queue for this channel. If queue does not exist,
//it will be created on the server.
channel.queueDeclare(endpointName, false, false, false, null);
}
/**
* Close channel and connection. Not necessary as it happens implicitly any way.
* @throws IOException
*/
public void close() throws IOException{
this.channel.close();
this.connection.close();
}
}
//生產者
Producer.java
public class Producer extends EndPoint{
public Producer(String endPointName) throws IOException{
super(endPointName);
}
public void sendMessage(Serializable object) throws IOException {
channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
}
}
//消費隊列
QueueConsumer.java
public class QueueConsumer extends EndPoint implements Runnable, Consumer{
public QueueConsumer(String endPointName) throws IOException{
super(endPointName);
}
public void run() {
try {
//start consuming messages. Auto acknowledge messages.
channel.basicConsume(endPointName, true,this);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Called when consumer is registered.
*/
public void handleConsumeOk(String consumerTag) {
System.out.println("Consumer "+consumerTag +" registered");
}
/**
* Called when new message is available.
*/
public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
Map map = (HashMap)SerializationUtils.deserialize(body);
System.out.println("Message Number "+ map.get("message number") + " received.");
}
public void handleCancel(String consumerTag) {}
public void handleCancelOk(String consumerTag) {}
public void handleRecoverOk(String consumerTag) {}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
}
//測試用例
Main.java
public class Main {
public Main() throws Exception{
QueueConsumer consumer = new QueueConsumer("queue");
Thread consumerThread = new Thread(consumer);
consumerThread.start();
Producer producer = new Producer("queue");
for (int i = 0; i < 100000; i++) {
HashMap message = new HashMap();
message.put("message number", i);
producer.sendMessage(message);
System.out.println("Message Number "+ i +" sent.");
}
}
/**
* @param args
* @throws SQLException
* @throws IOException
*/
public static void main(String[] args) throws Exception{
new Main();
}
}
運行報以下錯誤
Exception in thread "main" com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:355)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
at com.tony.test.EndPoint.<init>(EndPoint.java:26)
at com.tony.test.QueueConsumer.<init>(QueueConsumer.java:16)
at com.tony.test.test.<init>(test.java:13)
at com.tony.test.test.main(test.java:33)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:202)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:347)
... 6 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:209)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:515)
解決方法這個 或者這個
網上Rabbit性能測試
性能測試
上圖可以看到每秒百萬級別的消息進出數量,以及2343條的消息在隊列中等待。
前端時間我調MQ的時候也報如上標題的錯誤,當時MQ的v-host並沒給,我試驗過"/"和不配該項,但均報如題目所示錯誤,後經檢查為MQ有配置v-host
com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)