kafka入門一:安裝與使用
Kafak安裝與使用
一、前言
kafka是Apache平臺下的一種分散式釋出/訂閱訊息系統,也就是訊息中介軟體。在之前我使用的是ActiveMQ,初次接觸Kafka,先從最基本的路數走起,後續再進行深入的學習。
二、Kafka下載與安裝
Kafka版本:1.0.0
2.1 下載
下載地址:
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.12-1.0.0.tgz
以上為下載地址
2.2 linux下載命令,拿其中之一舉例
$> wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz
2.3 單機安裝
解壓、安裝
$> mv kafka_2.12-1.0.0.tgz /usr/local
$> tar -zxf kafka_2.12-1.0.0.tgz
Kafka強依賴於ZooKeeper,啟動Kafka必須先啟動ZooKeeper。單機時在configure/server.properties中的預設配置為zookeeper.connect=localhost:2181,暫不用作修改,直連本機ZooKeeper即可
啟動服務(後臺)
$> ./bin/kafka-server-start.sh ./config/server.properties &
停止時,先停止Kafka,再停止ZooKeeper
$> ./bin/kafka-server-stop.sh
$> cd /usr/local/zookeeper-3.4.11/
$> ./bin/zkServer.sh stop
測試
建立topic。建立一個名為“test”的topic,他只有一個分割槽--partition,一個副本--replication-factor
$> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
驗證topic是否建立成功
$> ./bin/kafka-topics.sh --list --zookeeper localhost:2181
test
在Console模式下,啟動producer傳送訊息(ctrl c退出Console模式)
$> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> love
> and
> peace
在Console模式下,啟動consumer消費訊息
$> ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
love
and
peace
在使用Console時,會有如下warning。提示是可以直連borker-list的
Using the ConsoleConsumer/Producer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
$> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
2.4 叢集部署
2.5 Kafka監控工具——KafkaOffsetMonitor安裝部署
請自行百度安裝包下載,需要注意一點的是,在jar包中有些js檔案需要科學上網才能訪問,相對應也有本地化修改的版本,我沒有找到,如果你有,請聯絡我(正經臉)!
安裝部署:
KafkaOffsetMonitor的所有執行資源已經打包為一個jar檔案,我們可以新建一個單獨的目錄存放monitor檔案。在同目錄下,編寫啟動指令碼:
vim monitorStart.sh
輸入以下內容
#! /bin/bash
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \ # 指明執行Web監控的類
--zk localhost:2181 \ # 連線的ZooKeeper伺服器地址
--port 8080 \ # 監控器Web執行埠
--refresh 10.seconds \ # 頁面資料重新整理時間
--retain 1.days # 頁面資料保留時間
更改sh檔案可執行許可權
chmod u+x monitorStart.sh
後臺啟動監控程式,即可訪問(注意是否該科學上網)
nohup ./monitorStart.sh &
訪問虛擬機器伺服器地址,我的是192.168.81.129:8080,如下,第一次訪問時為空白,進行一次消費即可
三、Java客戶端
首先在kafka伺服器上新生成一個topic
$> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic java-topic_test
Java端訊息生產者
public class JavaKafkaProducer {
private Logger logger = Logger.getLogger("JavaKafkaProducer");
/**
* 設定例項生產訊息的總數
*/
private static final int MSG_SIZE = 10;
/**
* 主題名稱
*/
public static final String TOPIC = "java-topic_test";
/**
* kafka伺服器節點
*/
private static final String BROKER_LIST = "192.168.81.129:9092";
private static KafkaProducer<String,String> producer = null;
static {
Properties configs = initConfig();
producer = new KafkaProducer<String, String>(configs);
}
// 1.kafka producer引數設定
private static Properties initConfig() {
Properties props = new Properties();
// broker列表
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
// 設定序列化的類
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
/**
* 0 不等待結果返回
* 1 等待至少有一個伺服器返回資料接收標識
* -1或all 表示必須接受到所有的伺服器返回標識,及同步寫入
*/
props.put("request.required.acks", "0");
/**
* 內部發送資料是非同步還是同步
* sync 同步,預設
* async非同步
*/
props.put("producer.type", "async");
// bootstrap.servers地址,必須指定
props.put("bootstrap.servers", "192.168.81.129:9092");
// 設定分割槽類,可以使用自定義分割槽類
// props.put("partitioner.class", "JavaKafkaProducerPartitioner");
// 延遲傳送時間
// props.put("linger.ms","1");
// 重試次數
props.put("message.send.max.retries", 0);
// 非同步提交的時候(async),併發提交的記錄數
props.put("batch.num.message", 200);
// 設定緩衝區大小,預設10kb
props.put("send.buffere.bytes", "102400");
return props;
}
/**
* 產生一個訊息
*/
private static String generateMessage() {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 5; i++) {
sb.append((new Random()).nextInt(20)).append(" ");
}
return sb.toString();
}
public static void main(String[] args) {
ProducerRecord<String, String> record = null;
String message = null;
try {
int num = 0;
for (int i = 0; i<MSG_SIZE; i++) {
message = generateMessage();
System.out.println(message);
record = new ProducerRecord<String, String>(TOPIC,"SCDN", message);
producer.send(record, new Callback() {
public void onCompletion (RecordMetaData recordMetadata, Exception e) {
if(e != null) {
e.printStackTrace();
System.out.println("傳送訊息失敗!");
} else {
System.out.println("傳送訊息成功!");
}
}
);
if(num++ % 10 == 0) {
Thread.sleep(2000);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (producer != null) {
producer.close();
producer = null;
}
}
}
}
自定義分割槽器
public class JavaKafkaProducerPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
/**
* 無參構造
*/
public JavaKafkaProducerPartitioner() {
this(new VerifiableProperties());
}
/**
* 建構函式,必須給定
*
* @param properties
*/
public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
// nothing
}
private static int toPositive(int number) {
return number & 0x7fffffff;
}
// 分割槽方法
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 除錯使用
System.out.println("key is " + key);
System.out.println("value is " + new String(valueBytes)); // 此方法和下面的方法都是列印value的值
System.out.println("value is " + value);
return new Random().nextInt(100) % numPartitions; // 返回分割槽
}
public void close() {
}
public void configure(Map<String, ?> map) {
}
}
在kafka伺服器中使用如下命令檢視相應主題下的offset偏移量日誌
$> cd $kafka
$> ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/java-topic_test-0/00000000000000000000.log --print-data-log
Java端消費者
public class JavaKafkaConsumer {
static Properties props = new Properties();
static {
props.put("bootstrap.servers", "192.168.81.129:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void main(String[] args) {
// 1.初始化消費者
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 2.訂閱主題,指定一個監聽器,用於在消費者發生平衡操作時回撥響應的業務處理
consumer.subscribe(Arrays.asList("java-topic_test"), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
consumer.commitAsync(); // 提交偏移量
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 獲取該分割槽下已消費的偏移量
long commitedOffset = -1;
for (TopicPartition topicPartition : partitions) {
// 獲取該分割槽下已消費的偏移量
commitedOffset = consumer.committed(topicPartition).offset();
// 重置偏移量到上一次提交的偏移量下一個位置處開始消費
consumer.seek(topicPartition, commitedOffset + 1);
}
}
});
try {
while (true) {
// 長輪詢拉取訊息
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String , String> record : records) {
System.out.printf("partition = %d, offset = %d,key= %s value = %s%n",
record.partition(), record.offset(),
record.key(),record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
ProducerConsole
121411218 key is CSDN0 value is 121411218 partition is 3 ----------- 訊息傳送成功!
ComsumerConsole
partition = 3, offset = 42,key= CSDN0 value = 121411218
成功生產和消費