3.RabbitMQ Work
package com.study.soufang.rabbit.a001.work;
public class ConstantOfWork {
public static final String QUEUE_NAME = "hello"; }
package com.study.soufang.rabbit.a001.work;
import java.io.IOException; import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.study.soufang.rabbit.a001.RabbitChannelUtil;
import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(chain = true) public class MyConsumer extends DefaultConsumer {
long sleepTime; public MyConsumer(Channel channel) { super(channel); }
@Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { try { String message = new String(body, "UTF-8"); System.out.println(Thread.currentThread().getName()+" [x] Received '" + message + "'"); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); /** * deliveryTag:該訊息的index multiple:是否批量.true:將一次性ack所有小於deliveryTag的訊息。 */ getChannel().basicAck(envelope.getDeliveryTag(), false); /*try { RabbitChannelUtil.closeChannel(getChannel()); } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); }*/ } }
}
package com.study.soufang.rabbit.a001.work;
import java.io.IOException; import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.study.soufang.rabbit.a001.RabbitChannelUtil;
public class Recv {
public static void doRecv(long sleepTime) throws IOException, TimeoutException{ Channel channel = null; try { channel = RabbitChannelUtil.createChannel(); channel.queueDeclare(ConstantOfWork.QUEUE_NAME, false, false, false, null); channel.basicQos(1);//同一時刻伺服器只會發一條訊息給消費者 Consumer consumer = new MyConsumer(channel).setSleepTime(sleepTime); /** * autoAck:是否自動ack,如果不自動ack,需要使用channel.ack、channel.nack、channel.basicReject 進行訊息應答 */ boolean autoAck = false; String result = channel.basicConsume(ConstantOfWork.QUEUE_NAME, autoAck, consumer); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, TimeoutException { new Thread(new Runnable() { @Override public void run() { try { doRecv(500); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { doRecv(2000); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); } }
package com.study.soufang.rabbit.a001.work;
import java.io.IOException; import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.study.soufang.rabbit.a001.RabbitChannelUtil;
public class Send {
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = null; try { channel = RabbitChannelUtil.createChannel(); /** * 宣告訊息要傳送到的佇列 */ channel.basicPublish("", ConstantOfWork.QUEUE_NAME, null, message.getBytes()); Thread.sleep(500); } } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); }finally{ RabbitChannelUtil.closeChannel(channel); } } }
package com.study.soufang.rabbit.a001;
import java.io.IOException; import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class RabbitChannelUtil {
private final static String RABBIT_HOST= "192.168.10.22"; private final static int RABBIT_PORT=5672; public static Channel createChannel() throws IOException, TimeoutException{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(RABBIT_HOST); factory.setPort(RABBIT_PORT); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("helloworld"); Connection connection = null; Channel channel = null; connection = factory.newConnection(); channel = connection.createChannel(); return channel; } /** * 關閉連線後不會再監聽 * @param channel * @throws IOException * @throws TimeoutException */ public static void closeChannel(Channel channel) throws IOException, TimeoutException{ Connection connection = channel.getConnection(); if(null != channel){ channel.close(); } if(null != connection){ connection.close(); } } }