jafka環境搭建步驟--例項可用
1 搭建例項
A 下載包
B 修改 jafka.conf修改 jafka_home修改絕對路徑
set.JAFKA_HOME=/opt/apps/jafka-1.2.1
我發現不改也沒事
C 執行jafkamq
sh ./bin/server.sh conf/server.properties
d 執行和停止
./run.sh start
…../ console
./run.sh stop
E java程式碼—非常重要埠問題
預設埠放在了conf/server.properties檔案下的埠
預設應該都是9002,
package com;
import java.util.Properties;
import com.sohu.jafka.producer.Producer;
importcom.sohu.jafka.producer.ProducerConfig;
importcom.sohu.jafka.producer.StringProducerData;
importcom.sohu.jafka.producer.serializer.StringEncoder;
public class JJProducer {
publicstatic void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("broker.list","0:10.16.xxx.xx:9002");
props.put("serializer.class",StringEncoder.class.getName());
ProducerConfig config = newProducerConfig(props);
Producer<String, String> producer =new Producer<String, String>(config);
StringProducerData data = newStringProducerData("demo");
for(int i=0;i<20;i++) {
data.add("Hello world #"+i);
}
try {
long start =System.currentTimeMillis();
for (int i = 0; i < 20; i++) {
producer.send(data);
}
long cost = System.currentTimeMillis()- start;
System.out.println("send 20message cost: "+cost+" ms");
} finally {
producer.close();
}
}
}
package com;
import com.sohu.jafka.api.FetchRequest;
import com.sohu.jafka.consumer.SimpleConsumer;
importcom.sohu.jafka.message.ByteBufferMessageSet;
importcom.sohu.jafka.message.MessageAndOffset;
import com.sohu.jafka.utils.Utils;
public class JJConsumer {
publicstatic void main(String[] args) throws Exception {
SimpleConsumerconsumer = new SimpleConsumer("10.16.xxx.xx", 9002);
//
longoffset = 0;
while(true) {
FetchRequest request = newFetchRequest("demo", 0, offset);
ByteBufferMessageSet set = consumer.fetch(request);
L.l.info("set size"+set);
for (MessageAndOffset msg : set) {
System.out.println(Utils.toString(msg.message.payload(),"UTF-8"));
offset = msg.offset;
L.l.info("message:"+Utils.toString(msg.message.payload(),"UTF-8"));
}
}
}
}
2 搭建以zookeeper來整合
其他一樣,
首先先要啟動zookeeper的,
B :事實上這句啟動會錯誤,而且根本不需要,直接傳送接收
首先要zookeeper已經建立好
$bin/zookeeper-server.sh conf/zookeeper.properties
C:進行傳送
bin/producer-console.sh --zookeeper 1.1.1.1:2184 --topic demo
D:進行接收
sh ./bin/consumer-console.sh --zookeeper 1.1.1.1:2184 --topic demo --from-beginning
3 kafka支援流了,就是輸入是topic,輸出也是是個jar包,和其他流框架有些類似
4 kafka connect快速實現資料進出,否則需要依賴於其他控制元件
5 支援adminClient api,:::
The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects.,
可以參考的 增刪 topic
6 配置屬性可程式設計和 在配置檔案中寫:
7 kafk 的監控機制:
8 問題,有時間要理解常見的概念