idea編譯kafka 2.6 原始碼
阿新 • • 發佈:2021-10-20
idea編譯kafka 2.6 原始碼
最近專案要接入kafka和flink,有時間就學一下kafka的原始碼,就編譯了一下kafka的原始碼,期間還是踩了不少坑的。
一、安裝jdk
二、安裝scala
三、安裝gradle
四、本地啟動zookeeper
最近在官網下載,然後解壓,調整配置檔案,將zoo_sample.cfg複製一份為zoo.cfg,內容如下:
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=D:/zookeeper # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60
之前問題就一直出現了這裡,因為我有一臺伺服器,zookeeper是安裝在上面的,然後本地kafka的原始碼的配置一直是伺服器上的資訊,就會導致kafka一執行就停了,一開始以為是kafka的版本問題,然後放了很久了,昨天就試了一下本地的zookeeper,結果就成功了。
執行一下cmd檔案:
五、將kafka生成idea專案
在官網下載kafka的原始碼,我此處的版本為2.6.0,到原始碼的地方執行一下gradle idea,然後在idea裡面安裝一下scala的外掛。在開始匯入原始碼
-
修改config下的server.properties的kafka日誌的位置和zookeeper的配置資訊。
-
將log4j.properties移到到kafka-2.6.0-src\core\src\main\resources\log4j.properties
-
修改build.gradle檔案,不然是沒有日誌的。如果是低版本的話,是不需要的,此處我用0.10.0.1版本測試過
project(':core') { println "Building project 'core' with Scala version ${versions.scala}" apply plugin: 'scala' apply plugin: "org.scoverage" archivesBaseName = "kafka_${versions.baseScala}" dependencies { compile project(':clients') compile libs.jacksonDatabind compile libs.jacksonModuleScala compile libs.jacksonDataformatCsv compile libs.jacksonJDK8Datatypes compile libs.joptSimple compile libs.metrics compile libs.scalaCollectionCompat compile libs.scalaJava8Compat compile libs.scalaLibrary // only needed transitively, but set it explicitly to ensure it has the same version as scala-library compile libs.scalaReflect compile libs.scalaLogging compile libs.slf4jApi compile libs.slf4jlog4j compile libs.log4j compile(libs.zookeeper) { // exclude module: 'slf4j-log4j12' // exclude module: 'log4j' } // ZooKeeperMain depends on commons-cli but declares the dependency as `provided` compile libs.commonsCli compileOnly libs.log4j testCompile project(':clients').sourceSets.test.output testCompile libs.bcpkix testCompile libs.mockitoCore testCompile libs.easymock testCompile(libs.apacheda) { exclude group: 'xml-apis', module: 'xml-apis' // `mina-core` is a transitive dependency for `apacheds` and `apacheda`. // It is safer to use from `apacheds` since that is the implementation. exclude module: 'mina-core' } testCompile libs.apachedsCoreApi testCompile libs.apachedsInterceptorKerberos testCompile libs.apachedsProtocolShared testCompile libs.apachedsProtocolKerberos testCompile libs.apachedsProtocolLdap testCompile libs.apachedsLdifPartition testCompile libs.apachedsMavibotPartition testCompile libs.apachedsJdbmPartition testCompile libs.junit testCompile libs.scalatest testCompile libs.slf4jlog4j testCompile libs.jfreechart }
-
修改啟動引數
-
啟動
六、進行測試
生成者
public class ProducerFastStart {
//kafka叢集地址
private static final String brokerList = "localhost:9092";
//主體名稱
private static final String topic = "dalianpai";
public static void main(String[] args) {
Properties properties = new Properties();
//設定序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//設定重試次數
properties.put(ProducerConfig.RETRIES_CONFIG,10);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
// 2 構建攔截鏈
List<String> interceptors = new ArrayList<>();
interceptors.add(CounterInterceptor.class.getName());
interceptors.add(TimeInterceptor.class.getName());
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3 傳送訊息
for (int i = 0; i < 11; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!"+i);
producer.send(record);
}
producer.close();
}
}
消費者
public class ConsumerFastStart {
// Kafka叢集地址
private static final String brokerList = "localhost:9092";
// 主題名稱-之前已經建立
private static final String topic = "dalianpai";
// 消費組
private static final String groupId = "group.demo";
public static void main(String[] args) {
Properties properties = new Properties();
//設定序列化器
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
properties.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
測試結果: