RocketMQ自定義selector實現訊息通道定向傳送和拉取
阿新 • • 發佈:2019-01-23
RocketMQ的簡單應用請參考官網github樣例
本篇介紹如何通過自定義selector實現按messageQueue定向傳送和接收訊息
我們先看看MessageQueueSelector介面
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
RocketMQ通過MessageQueueSelector中實現的演算法來確定訊息傳送到哪一個佇列上,RocketMQ預設提供了三種實現,分別是SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandoom。MessageQueueSelector的select方法提供了三個入參,分別為訊息佇列集合、訊息和擴充套件引數。
1、pom.xml引入rocketmq jar包
<!-- 引入rocketmq --> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> <!-- 提供常用的lang包工具類 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency>
2、MessageQueueSelector介面實現
package com.lh.rocketmq.selector; import java.util.List; import org.apache.commons.lang3.math.NumberUtils; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; /** * 通過呼叫 producer.send(msg, new SelectMessageQueueByExtOrg() , queueId)指定傳送通道 * * @author lh * @since 2017-4-22 * @version 1.0.0 * */ public class SelectMessageQueueByExtOrg implements MessageQueueSelector { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return mqs.get(NumberUtils.toInt(arg.toString())); } }
3、producer通過自定義的MessageQueueSelector 傳送訊息
package com.lh.rocketmq.producer;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.lh.rocketmq.common.MqConst;
import com.lh.rocketmq.selector.SelectMessageQueueByExtOrg;
/**
* producer通過自定義的MessageQueueSelector 傳送訊息
* @author lh
* @since 2017-4-23
*
*/
public class ProducerByExtOrgSelector {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr(MqConst.NAME_SRV_ADDR);
try {
producer.start();
int queueId = 0;
for (int i = 0; i < 16000; i++) {
queueId = i % 4;
Message msg = new Message(MqConst.TOPIC_NAME, MqConst.TAG_PUSH + queueId, "key" + i, ("hello rocketmq " + i).getBytes());
SendResult result = producer.send(msg, new SelectMessageQueueByExtOrg(), queueId);
System.out.println("offset=" + result.getQueueOffset() + ", msgId=" + result.getMsgId() + ", sendStatus=" + result.getSendStatus());
}
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}
mq常量類
package com.lh.rocketmq.common;
/**
* mq常量類
* @author lh
*
*/
public class MqConst {
/**
* 服務地址
*/
public static final String NAME_SRV_ADDR = "192.168.191.130:9876";
/**
* 主題名稱
*/
public static final String TOPIC_NAME = "rocketmq-simple-demo";
/**
* broker名稱
*/
public static final String BROKER_NAME = "localhost.localdomain";
/**
* tag
*/
public static final String TAG_PUSH = "push";
/**
* 訊息定向queueId
* 對應Message.getUserProperty(MqConst.MESSAGE_KEY_QUEUE_ID)
*/
public static final String MESSAGE_KEY_QUEUE_ID="queueId";
/**
* 測試佇列和tag相同的標識
*/
public static final int TARGET_QUEUEID_TAG= 0;
}
4、通過指定的messageQueue拉取訊息
package com.lh.rocketmq.consumer;
import java.nio.charset.Charset;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.lh.rocketmq.common.MqConst;
/**
* 通過指定的messageQueue拉取訊息
* @author lh
* @since 2017-04-23
*
*/
public class PullConsumerByQueueId {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumer");
consumer.setNamesrvAddr(MqConst.NAME_SRV_ADDR);
long offset = 0;
long maxOffset = offset;
try {
consumer.start();
MessageQueue mq = new MessageQueue(MqConst.TOPIC_NAME, MqConst.BROKER_NAME, MqConst.TARGET_QUEUEID_TAG);
do{
PullResult result = consumer.pullBlockIfNotFound(mq, null, offset, 32);
List<MessageExt> msgs = result.getMsgFoundList();
if (msgs != null && msgs.size() != 0) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody(), Charset.forName("utf-8")));
}
}
offset = result.getNextBeginOffset();
maxOffset = result.getMaxOffset();
System.out.println("offset="+offset+", status="+result.getPullStatus());
}while(offset < maxOffset);
} catch (Exception e) {
e.printStackTrace();
}finally{
consumer.shutdown();
long endTime = System.currentTimeMillis();
System.out.println("PullConsumerByQueueId\t take times="+ (endTime - startTime));
}
}
}
附近為工程程式碼,有需要的同學請自行下載