RocketMQ訊息傳送之pull和push
阿新 • • 發佈:2019-01-06
RocketMQ學習(五)——RocketMQ訊息傳送之pull和push
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { String group_name = "pull_producer"; DefaultMQProducer producer = new DefaultMQProducer(group_name); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < 5; i++) { try { Message msg = new Message("TopicPull",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg,1000); System.out.println(sendResult); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); Thread.sleep(2000); } } producer.shutdown(); } }
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { String group_name = "pull_producer"; DefaultMQProducer producer = new DefaultMQProducer(group_name); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < 5; i++) { try { Message msg = new Message("TopicPull",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg,1000); System.out.println(sendResult); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); Thread.sleep(2000); } } producer.shutdown(); } }
import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullTaskCallback; import org.apache.rocketmq.client.consumer.PullTaskContext; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; public class PullScheduleService { public static void main(String[] args) throws InterruptedException, MQClientException { String group_name = "schedule_consumer"; String TOPIC_TEST = "TopicPull"; final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name); DefaultMQPullConsumer consumer = scheduleService.getDefaultMQPullConsumer(); consumer.setNamesrvAddr("localhost:9876"); scheduleService.setMessageModel(MessageModel.CLUSTERING); scheduleService.registerPullTaskCallback(TOPIC_TEST, new PullTaskCallback() { public void doPullTask(MessageQueue mq, PullTaskContext context) { MQPullConsumer consumer = context.getPullConsumer(); try { //獲取從哪裡開始拉取 long offset = consumer.fetchConsumeOffset(mq, false); if(offset < 0) { offset = 0; } PullResult pullResult = consumer.pull(mq, "*", offset, 32); switch (pullResult.getPullStatus()) { case FOUND: List<MessageExt> list = pullResult.getMsgFoundList(); for (MessageExt msg : list) { System.out.println(new String(msg.getBody())); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: case OFFSET_ILLEGAL: break; default: break; } //儲存offset,客戶端每隔5s會定時重新整理到broker consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); //重新拉取 建議超過5s這樣就不會重複獲取 context.setPullNextDelayTimeMillis(6000); } catch (Exception e) { e.printStackTrace(); } } }); scheduleService.start(); } }