《Apache RocketMQ使用者指南》之批量訊息示例
阿新 • • 發佈:2018-12-22
為什麼選擇批量訊息?
批量傳送訊息可提高單次傳送訊息的效能.
使用限制
相同批次的訊息應具有:相同的主題,相同的等待訊息處理成功但是不支援定時處理. 此外,一個批量的訊息的總大小不要錯過1MB.
怎麼使用批量訊息
如果您一次只發送不超過1MB的訊息,使用批量傳送很方便:
String topic = "BatchTest"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); try { producer.send(messages); } catch (Exception e) { e.printStackTrace(); //handle the error }
分割成列表
只有在傳送大批量時才會增加複雜性,並且您可能不確定是否超出了大小限制(1MiB)。
目前,你最好分開列表:
public class ListSplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1000 * 1000; private final List<Message> messages; private int currIndex; public ListSplitter(List<Message> messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; //for log overhead if (tmpSize > SIZE_LIMIT) { //it is unexpected that single message exceeds the SIZE_LIMIT //here just let it go, otherwise it will block the splitting process if (nextIndex - currIndex == 0) { //if the next sublist has no element, add this one and then break, otherwise just break nextIndex++; } break; } if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } } //then you could split the large list into small ones: ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); //handle the error } }