1. 程式人生 > >RabbitMQ生產者消費者

RabbitMQ生產者消費者

.com factory fast eat serial main hash new t factor

package com.ra.car.rabbitMQ;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; /** * 生產者,用來發送拍照指令 * *
*/ public class RabbitMQProducer { protected static final Logger logger = LoggerFactory.getLogger(RabbitMQProducer.class); private final static String QUEUE_NAME = "44b2fe8a-4d70-4a18-b75f-91a6f170bd16"; // 上送隊列 private String message; public RabbitMQProducer() { } public RabbitMQProducer(String message) {
this.message = message; } public void sendMessage(){ String replyQueueName = null; // 返回隊列名 ConnectionFactory connFactory = null; Connection conn = null; Channel channel = null; try { connFactory = new ConnectionFactory(); connFactory.setHost("58.211.54.147"); connFactory.setUsername("customer"); connFactory.setPassword("123456"); connFactory.setPort(5672); conn = connFactory.newConnection(); channel = conn.createChannel(); QueueingConsumer consumer = new QueueingConsumer(channel); Map<String, Object> param = new HashMap<String, Object>(); param.put("x-message-ttl", 600000); param.put("x-expires", 86400000); // 返回隊列 replyQueueName = channel.queueDeclare().getQueue(); channel.basicConsume(replyQueueName, true, consumer); String corrId = UUID.randomUUID().toString(); // 用來表示返回隊列結果的id,唯一 BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); channel.queueDeclare(QUEUE_NAME, true, false, false, param); channel.basicPublish("", QUEUE_NAME, props, message.getBytes()); logger.info("producer has published: \"" + message + "\""); } catch (IOException ioe) { ioe.printStackTrace(); } catch (TimeoutException toe) { toe.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); } catch (ConsumerCancelledException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { if (channel != null) try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } if (conn != null) try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { JSONObject json = new JSONObject(); json.put("msgId", "8801"); json.put("gpsNo", "001709270202"); json.put("channelId", "1");// 1,2 前置 後置 json.put("serialNo", "1133"); //RabbitMQProducer rb = new RabbitMQProducer(json.toString()); //Thread t = new Thread(rb); //t.start(); //String str = "7e0805000901170427881200090003000001b6b5833313"; //System.out.println(str.substring(26, 36)+"**"+str.substring(36,str.length()-2)); } }
package com.ra.car.rabbitMQ;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.ra.truck.model.MessagePackage;
import com.ra.truck.service.MessagePackegerService;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.context.ContextLoader;

import com.alibaba.fastjson.JSON;
import com.ra.car.utils.StringToT;
import com.ra.common.util.UuidUtil;
import com.ra.truck.service.DataCallBackService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

/**
 * 消費者
 * 
 * 
 */
public class RabbitMQCustomer implements Runnable {
    protected static final Logger logger = LoggerFactory
            .getLogger(RabbitMQCustomer.class);
    private final static String QUEUE_NAME = "adb65b08-a27d-42b0-b4ac-ff10422ac213";

    private ConnectionFactory connFactory;
    private Connection conn;
    private Channel channel;
    private Delivery delivery;
    private QueueingConsumer consumer;
    private SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    private MessagePackegerService messagepackegerService = (MessagePackegerService)ContextLoader
            .getCurrentWebApplicationContext().getBean("MessagePackegerService");
    private DataCallBackService dataCallBackService = (DataCallBackService) ContextLoader
            .getCurrentWebApplicationContext().getBean("DataCallBackService");

    /**
     * 分開try...catch...,1.出現連接異常時,中斷連接;2.出現消費數據異常時,繼續去消費,不中斷連接
     */
    @Override
    public void run() {
        try {
            connFactory = new ConnectionFactory();
            connFactory.setHost("58.211.54.147");
            connFactory.setUsername("customer");
            connFactory.setPassword("123456");
            connFactory.setPort(5672);
            conn = connFactory.newConnection();
            channel = conn.createChannel();

            Map<String, Object> param = new HashMap<String, Object>();
            param.put("x-message-ttl", 600000);
            param.put("x-expires", 86400000);

            channel.queueDeclare(QUEUE_NAME, true, false, false, param);

            logger.info("listening for event message...");

            consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, true, consumer);

            while (true) {
                try {
                    Thread.sleep(100);
                    delivery = consumer.nextDelivery();
                    // BasicProperties props = delivery.getProperties();
                    /*
                     * BasicProperties reply_props = new
                     * BasicProperties.Builder()
                     * .correlationId(props.getCorrelationId()).build();
                     */
                    // String msg = new String(delivery.getBody());
                    logger.info("*****當前時間:" + df.format(new Date()) + "*****");
                    logger.info("*****原始數據為:" + JSON.toJSONString(delivery.getBody()) + "*****");
                    String msg = MQUtils.bytes2Hex(delivery.getBody());
                    logger.info("receive msg:" + msg);

                    //RdDeviceCallBackDataDomain backDataDomain = new RdDeviceCallBackDataDomain();
                    //backDataDomain.setId(String.valueOf(System.currentTimeMillis()));
                    if (StringUtils.isNotBlank(msg)) {
                        MessagePackage msgPackeger = new MessagePackage();
                        if (msg.length() > 28) {
                            msg = msg.substring(0, msg.length()-2).replaceAll("7d02", "7e").replaceAll("7d01", "7d");//去掉標識位,轉義還原
                            msgPackeger.setId(UuidUtil.create());
                            String messageId = msg.substring(2, 6);
                            msgPackeger.setMessageId(messageId);//消息Id
                            msgPackeger.setMessageProperty(msg.substring(6, 10));//消息體屬性
                            msgPackeger.setImei(msg.substring(10, 22));//imi 終端手機號
                            msgPackeger.setSerialNumber(msg.substring(22, 26));//流水號
                            msgPackeger.setCheckCode(msg.substring(msg.length()-2, msg.length()));//校驗碼
                            if (StringUtils.isNotBlank(messageId) && messageId.equals("0801")) {
                                String msgBodyProperties = msg.substring(6, 10);// 消息體屬性
                                String msgBodyProperties2 = StringToT.hexString2binaryString(msgBodyProperties);//消息屬性二進制格式
                                String isSubpackage = msgBodyProperties2.substring(2, 3);   //是否分
                                if ("1".equals(isSubpackage)) {

                                    String isSplitNumber = msg.substring(26, 30); // 分報數
                                    String isSplit = msg.substring(30, 34);//消息流水號
                                    msgPackeger.setMessageSplit(isSplitNumber + isSplit);
                                    if("0001".equals(isSplit)) {
                                        String mediaId = msg.substring(34, 42);

                                        msgPackeger.setMeidiaId(mediaId);
                                        msgPackeger.setMessageBody(msg.substring(42, msg.length() - 2));
                                    }else{
                                        msgPackeger.setMessageSplit("0001");
                                        List<MessagePackage> messagepackegerList=messagepackegerService.selectMessagepackeger(msgPackeger);
                                        String meidiaId=messagepackegerList.get(0).getMeidiaId();
                                        String mediabody = messagepackegerList.get(0).getMessageBody().substring(0,64);
                                        msgPackeger.setMessageSplit(isSplitNumber + isSplit);
                                        msgPackeger.setMeidiaId(meidiaId);
                                        msgPackeger.setMessageBody(mediabody+msg.substring(34, msg.length()- 2));
                                    }
                                } else {
                                    String mediaId = msg.substring(26, 34);
                                    msgPackeger.setMeidiaId(mediaId);
                                    msgPackeger.setMessageBody(msg.substring(34, msg.length() - 2));
                                }
                            }else if(StringUtils.isNotBlank(messageId) && messageId.equals("0805")){
                                 msgPackeger.setMessageBody(msg.substring(26, 36));
                                 msgPackeger.setMeidiaId(msg.substring(36, msg.length()-2));
                            }else{
                               msgPackeger.setMessageBody(msg.substring(26, msg.length() - 2));//消息體
                            }
                              //保存校驗碼
                              msgPackeger.setCheckCode((msg.substring(msg.length()- 2,msg.length())));
                                dataCallBackService.insertDeviceRawDataOfOne(msgPackeger);
                            }
                        }
                    // 不用設置返回
                    /*
                     * String retMsg = "ok, give you reply:" + new
                     * String(msg.getBytes(), "utf-8");
                     * logger.info("Consumer中的返回隊列名" + props.getReplyTo());
                     * channel.basicPublish("", QUEUE_NAME, reply_props,
                     * retMsg.getBytes());
                     */
                } catch (Exception e) {
                    logger.error("循環消費數據異常.....", e);
                }
            }
        } catch (Exception e) {
            logger.error("MQ connection error.....", e);
        } finally {
            try {
                if (channel != null) {
                    logger.info("channel.close");
                    channel.close();
                }
                if (conn != null) {
                    logger.info("conn.close");
                    conn.close();
                }
            } catch (IOException e) {
                logger.info("IOException");
            } catch (TimeoutException e) {
                logger.info("TimeoutException");
            }
        }

    }
}

RabbitMQ生產者消費者