Apache kafka客戶端開發demo
阿新 • • 發佈:2019-01-29
該部落格轉載自:http://www.aboutyun.com/thread-9906-1-1.html
1.依賴包
-
<dependency>
-
<groupId>org.apache.kafka</groupId>
-
<artifactId>kafka_2.10</artifactId>
-
<version>0.8.1</version>
- </dependency>
2.producer程式開發例子
2.1 producer引數說明
-
#指定kafka節點列表,用於獲取metadata,不必全部指定
-
metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092
-
# 指定分割槽處理類。預設kafka.producer.DefaultPartitioner,表通過key雜湊到對應分割槽
-
#partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner
-
# 是否壓縮,預設0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮後訊息中會有頭來指明訊息壓縮型別,故在消費者端訊息解壓是透明的無需指定。
-
compression.codec=none
-
# 指定序列化處理類(mafka client API呼叫說明-->3.序列化約定wiki),預設為kafka.serializer.DefaultEncoder,即byte[]
-
serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder
-
# serializer.class=kafka.serializer.DefaultEncoder
-
# serializer.class=kafka.serializer.StringEncoder
-
# 如果要壓縮訊息,這裡指定哪些topic要壓縮訊息,預設empty,表示不壓縮。
-
#compressed.topics=
-
########### request ack ###############
-
# producer接收訊息ack的時機.預設為0.
-
# 0: producer不會等待broker傳送ack
-
# 1: 當leader接收到訊息之後傳送ack
-
# 2: 當所有的follower都同步訊息成功後傳送ack.
-
request.required.acks=0
-
# 在向producer傳送ack之前,broker允許等待的最大時間
-
# 如果超時,broker將會向producer傳送一個error ACK.意味著上一次訊息因為某種
-
# 原因未能成功(比如follower未能同步成功)
-
request.timeout.ms=10000
-
########## end #####################
-
# 同步還是非同步傳送訊息,預設“sync”表同步,"async"表非同步。非同步可以提高發送吞吐量,
-
# 也意味著訊息將會在本地buffer中,並適時批量傳送,但是也可能導致丟失未傳送過去的訊息
-
producer.type=sync
-
############## 非同步傳送 (以下四個非同步引數可選) ####################
-
# 在async模式下,當message被快取的時間超過此值後,將會批量傳送給broker,預設為5000ms
-
# 此值和batch.num.messages協同工作.
-
queue.buffering.max.ms = 5000
-
# 在async模式下,producer端允許buffer的最大訊息量
-
# 無論如何,producer都無法儘快的將訊息傳送給broker,從而導致訊息在producer端大量沉積
-
# 此時,如果訊息的條數達到閥值,將會導致producer端阻塞或者訊息被拋棄,預設為10000
-
queue.buffering.max.messages=20000
-
# 如果是非同步,指定每次批量傳送資料量,預設為200
-
batch.num.messages=500
-
# 當訊息在producer端沉積的條數達到"queue.buffering.max.meesages"後
-
# 阻塞一定時間後,佇列仍然沒有enqueue(producer仍然沒有傳送出任何訊息)
-
# 此時producer可以繼續阻塞或者將訊息拋棄,此timeout值用於控制"阻塞"的時間
-
# -1: 無阻塞超時限制,訊息不會被拋棄
-
# 0:立即清空佇列,訊息被拋棄
-
queue.enqueue.timeout.ms=-1
-
################ end ###############
-
# 當producer接收到error ACK,或者沒有接收到ACK時,允許訊息重發的次數
-
# 因為broker並沒有完整的機制來避免訊息重複,所以當網路異常時(比如ACK丟失)
-
# 有可能導致broker接收到重複的訊息,預設值為3.
-
message.send.max.retries=3
-
# producer重新整理topic metada的時間間隔,producer需要知道partition leader的位置,以及當前topic的情況
-
# 因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立即重新整理
-
# (比如topic失效,partition丟失,leader失效等),此外也可以通過此引數來配置額外的重新整理機制,預設值600000
- topic.metadata.refresh.interval.ms=60000
-
import java.util.*;
-
import kafka.javaapi.producer.Producer;
-
import kafka.producer.KeyedMessage;
-
import kafka.producer.ProducerConfig;
-
public class TestProducer {
-
public static void main(String[] args) {
-
long events = Long.parseLong(args[0]);
-
Random rnd = new Random();
-
Properties props = new Properties();
-
props.put("metadata.broker.list", "192.168.2.105:9092");
-
props.put("serializer.class", "kafka.serializer.StringEncoder"); //預設字串編碼訊息
-
props.put("partitioner.class", "example.producer.SimplePartitioner");
-
props.put("request.required.acks", "1");
-
ProducerConfig config = new ProducerConfig(props);
-
Producer<String, String> producer = new Producer<String, String>(config);
-
for (long nEvents = 0; nEvents < events; nEvents++) {
-
long runtime = new Date().getTime();
-
String ip = “192.168.2.” + rnd.nextInt(255);
-
String msg = runtime + “,www.example.com,” + ip;
-
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
-
producer.send(data);
-
}
-
producer.close();
-
}
-
}
2.2 指定關鍵字key,傳送訊息到指定partitions
說明:如果需要實現自定義partitions訊息傳送,需要實現Partitioner介面
-
public class CustomizePartitioner implements Partitioner {
-
public CustomizePartitioner(VerifiableProperties props) {
-
}
-
/**
-
* 返回分割槽索引編號
-
* @param key sendMessage時,輸出的partKey
-
* @param numPartitions topic中的分割槽總數
-
* @return
-
*/
-
@Override
-
public int partition(Object key, int numPartitions) {
-
System.out.println("key:" + key + " numPartitions:" + numPartitions);
-
String partKey = (String)key;
-
if ("part2".equals(partKey))
-
return 2;
-
// System.out.println("partKey:" + key);
-
........
-
........
-
return 0;
-
}
- }
3.consumer程式開發例子
3.1 consumer引數說明
-
# zookeeper連線伺服器地址,此處為線下測試環境配置(kafka訊息服務-->kafka broker叢集線上部署環境wiki)
-
# 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-
zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka
-
# zookeeper的session過期時間,預設5000ms,用於檢測消費者是否掛掉,當消費者掛掉,其他消費者要等該指定時間才能檢查到並且觸發重新負載均衡
-
zookeeper.session.timeout.ms=5000
-
zookeeper.connection.timeout.ms=10000
-
# 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次獲得的訊息。一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的訊息
-
zookeeper.sync.time.ms=2000
-
#指定消費組
-
group.id=xxx
-
# 當consumer消費一定量的訊息之後,將會自動向zookeeper提交offset資訊
-
# 注意offset資訊並不是每消費一次訊息就向zk提交一次,而是現在本地儲存(記憶體),並定期提交,預設為true
-
auto.commit.enable=true
-
# 自動更新時間。預設60 * 1000
-
auto.commit.interval.ms=1000
-
# 當前consumer的標識,可以設定,也可以有系統生成,主要用來跟蹤訊息消費情況,便於觀察
-
conusmer.id=xxx
-
# 消費者客戶端編號,用於區分不同客戶端,預設客戶端程式自動產生
-
client.id=xxxx
-
# 最大取多少塊快取到消費者(預設10)
-
queued.max.message.chunks=50
-
# 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新
-
# 的consumer上,如果一個consumer獲得了某個partition的消費許可權,那麼它將會向zk註冊
-
# "Partition Owner registry"節點資訊,但是有可能此時舊的consumer尚沒有釋放此節點,
-
# 此值用於控制,註冊節點的重試次數.
-
rebalance.max.retries=5
-
# 獲取訊息的最大尺寸,broker不會像consumer輸出大於此值的訊息chunk
-
# 每次feth將得到多條訊息,此值為總大小,提升此值,將會消耗更多的consumer端記憶體
-
fetch.min.bytes=6553600
-
# 當訊息的尺寸不足時,server阻塞的時間,如果超時,訊息將立即傳送給consumer
-
fetch.wait.max.ms=5000
-
socket.receive.buffer.bytes=655360
-
# 如果zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。有smallest、largest、
-
# anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。預設largest
-
auto.offset.reset=smallest
-
# 指定序列化處理類(mafka client API呼叫說明-->3.序列化約定wiki),預設為kafka.serializer.DefaultDecoder,即byte[]
- derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder
3.2 多執行緒並行消費topic
ConsumerTest類
-
import kafka.consumer.ConsumerIterator;
-
import kafka.consumer.KafkaStream;
-
public class ConsumerTest implements Runnable {
-
private KafkaStream m_stream;
-
private int m_threadNumber;
-
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
-
m_threadNumber = a_threadNumber;
-
m_stream = a_stream;
-
}
-
public void run() {
-
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
-
while (it.hasNext())
-
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
-
System.out.println("Shutting down Thread: " + m_threadNumber);
-
}
- }
ConsumerGroupExample類
-
import kafka.consumer.ConsumerConfig;
-
import kafka.consumer.KafkaStream;
-
import kafka.javaapi.consumer.ConsumerConnector;
-
import java.util.HashMap;
-
import java.util.List;
-
import java.util.Map;
-
import java.util.Properties;
-
import java.util.concurrent.ExecutorService;
-
import java.util.concurrent.Executors;
-
public class ConsumerGroupExample {
-
private final ConsumerConnector consumer;
-
private final String topic;
-
private ExecutorService executor;
-
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
-
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
-
createConsumerConfig(a_zookeeper, a_groupId));
-
this.topic = a_topic;
-
}
-
public void shutdown() {
-
if (consumer != null) consumer.shutdown();
-
if (executor != null) executor.shutdown();
-
}
-
public void run(int a_numThreads) {
-
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-
topicCountMap.put(topic, new Integer(a_numThreads));
-
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
-
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
-
// 啟動所有執行緒
-
executor = Executors.newFixedThreadPool(a_numThreads);
-
// 開始消費訊息
-
int threadNumber = 0;
-
for (final KafkaStream stream : streams) {
-
executor.submit(new ConsumerTest(stream, threadNumber));
-
threadNumber++;
-
}
-
}
-
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
-
Properties props = new Properties();
-
props.put("zookeeper.connect", "192.168.2.225:2183/config/mobile/mq/mafka");
-
props.put("group.id", "push-token");
-
props.put("zookeeper.session.timeout.ms", "60000");
-
props.put("zookeeper.sync.time.ms", "2000");
-
props.put("auto.commit.interval.ms", "1000");
-
return new ConsumerConfig(props);
-
}
-
public static void main(String[] args) {
-
String zooKeeper = args[0];
-
String groupId = args[1];
-
String topic = args[2];
-
int threads = Integer.parseInt(args[3]);
-
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
-
example.run(threads);
-
try {
-
Thread.sleep(10000);
-
} catch (InterruptedException ie) {
-
}
-
example.shutdown();
-
}
- }
總結:
kafka消費者api分為high api和low api,目前上述demo是都是使用kafka high api,高階api不用關心維護消費狀態資訊和負載均衡,系統會根據配置引數,
定期flush offset到zk上,如果有多個consumer且每個consumer建立了多個執行緒,高階api會根據zk上註冊consumer資訊,進行自動負載均衡操作。
注意事項:
1.高階api將會內部實現持久化每個分割槽最後讀到的訊息的offset,資料儲存在zookeeper中的消費組名中(如/consumers/push-token-group/offsets/push-token/2。
其中push-token-group是消費組,push-token是topic,最後一個2表示第3個分割槽),每間隔一個(預設1000ms)時間更新一次offset,
那麼可能在重啟消費者時拿到重複的訊息。此外,當分割槽leader發生變更時也可能拿到重複的訊息。因此在關閉消費者時最好等待一定時間(10s)然後再shutdown()
2.消費組名是一個全域性的資訊,要注意在新的消費者啟動之前舊的消費者要關閉。如果新的程序啟動並且消費組名相同,kafka會新增這個程序到可用消費執行緒組中用來消費
topic和觸發重新分配負載均衡,那麼同一個分割槽的訊息就有可能傳送到不同的程序中。
3.如果消費者組中所有consumer的匯流排程數量大於分割槽數,一部分執行緒或某些consumer可能無法讀取訊息或處於空閒狀態。
4.如果分割槽數多於執行緒數(如果消費組中執行者多個消費者,則執行緒數為消費者組內所有消費者執行緒總和),一部分執行緒會讀取到多個分割槽的訊息
5.如果一個執行緒消費多個分割槽訊息,那麼接收到的訊息是不能保證順序的。
備註:可用zookeeper web ui工具管理檢視zk目錄樹資料: xxx/consumers/push-token-group/owners/push-token/2其中
push-token-group為消費組,push-token為topic,2為分割槽3.檢視裡面的內容如:
push-token-group-mobile-platform03-1405157976163-7ab14bd1-0表示該分割槽被該標示的執行緒所執行。
總結:
producer效能優化:非同步化,訊息批量傳送,具體瀏覽上述引數說明。consumer效能優化:如果是高吞吐量資料,設定每次拿取訊息(fetch.min.bytes)大些,
拿取訊息頻繁(fetch.wait.max.ms)些(或時間間隔短些),如果是低延時要求,則設定時間時間間隔小,每次從kafka broker拿取訊息儘量小些。