rocketMQ生產者和消費者
阿新 • • 發佈:2019-01-22
<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">簡單寫一個MQ的生產者和消費者。生產者生產4個topic。消費者訂閱消費,多執行緒啟動4個執行緒,每個執行緒新建一個消費者來消費一個topic的資料。rocketMQ本身就是多執行緒的,設定每個消費者的執行緒數為5個。例子如下:</span>
生產者程式碼如下:
import javax.annotation.PreDestroy; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; public class Producer { public static DefaultMQProducer defaultMQproducer; static String rocketmqAddress ="10.20.18.20:9876"; public static DefaultMQProducer init() { if (null == defaultMQproducer){ defaultMQproducer = new DefaultMQProducer("messageGroup"); defaultMQproducer.setNamesrvAddr(rocketmqAddress); defaultMQproducer.setInstanceName("messageProducer"); defaultMQproducer.setMaxMessageSize(9999999); try { defaultMQproducer.start(); } catch (MQClientException e) { } } return defaultMQproducer; } @PreDestroy public void preDestroy(){ if (defaultMQproducer != null) { defaultMQproducer.shutdown(); } } }
import com.alibaba.fastjson.JSON; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class ProduceCar { public static void main(String args[]){ for (int i=1;i<=4;i++){ for (int j=1;j<=100;j++){ Message msg = new Message("hello"+String.valueOf(i), JSON.toJSONString(i+"hello"+j).getBytes()); DefaultMQProducer difaultProducer = Producer.init(); try { SendResult sendResult = difaultProducer.send(msg); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }} System.out.println("finished"); } }
消費者程式碼如下:
import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class Consumer { public static void main(String args[]){ try{ Executor executor = Executors.newFixedThreadPool(4); Map<String,String> map = new HashMap(); int i=0; map.put("hello1", "5"); map.put("hello2", "5"); map.put("hello3", "5"); map.put("hello4", "5"); for (String key : map.keySet()) { i+=1; //logger.aduit("Key = {0}, Value = {1}",entry.getKey(),entry.getValue()); ConsumerFactory runComsumer = new ConsumerFactory(); runComsumer.setTopics(key); runComsumer.setInstanceName("ConsumerInstance"+key); runComsumer.setThreadNum(Integer.valueOf(map.get(key))); runComsumer.setGroupName("Consumer"+key); executor.execute(runComsumer); } }catch(Exception e){ System.out.println("create consumer err:{0}"+ e.getMessage()); } } }
import java.util.List;
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
public class ConsumerFactory extends Thread{
public final static String ANDROID = "Android";
public final static String IOS = "Ios";
private String rocketmqAddress="10.20.18.20:9876";
int threadNum;
String topics;
String instanceName;
String groupName;
DefaultMQPushConsumer consumer = null;
@Override
public void run() {
try {
consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(rocketmqAddress);//MQ地址
consumer.setClientCallbackExecutorThreads(threadNum);//消費現場數量
consumer.setInstanceName(instanceName);//例項名稱
consumer.subscribe(topics, "*");
//註冊監聽
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (int i = 0; i < msgs.size(); i++) {
MessageExt msgExt = msgs.get(i);
String msgId = msgExt.getMsgId();
String mesBody = new String(msgExt.getBody());
System.out.println(mesBody);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("執行緒執行中");
} catch (Exception e) {
System.out.println(e);
}
}
public int getThreadNum() {
return threadNum;
}
public void setThreadNum(int threadNum) {
this.threadNum = threadNum;
}
public String getTopics() {
return topics;
}
public void setTopics(String topics) {
this.topics = topics;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
}