1. 程式人生 > 其它 >idea編譯kafka 2.6 原始碼

idea編譯kafka 2.6 原始碼

idea編譯kafka 2.6 原始碼

最近專案要接入kafka和flink,有時間就學一下kafka的原始碼,就編譯了一下kafka的原始碼,期間還是踩了不少坑的。

一、安裝jdk

二、安裝scala

三、安裝gradle


四、本地啟動zookeeper

最近在官網下載,然後解壓,調整配置檔案,將zoo_sample.cfg複製一份為zoo.cfg,內容如下:

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=D:/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60

之前問題就一直出現了這裡,因為我有一臺伺服器,zookeeper是安裝在上面的,然後本地kafka的原始碼的配置一直是伺服器上的資訊,就會導致kafka一執行就停了,一開始以為是kafka的版本問題,然後放了很久了,昨天就試了一下本地的zookeeper,結果就成功了。

執行一下cmd檔案:

五、將kafka生成idea專案

在官網下載kafka的原始碼,我此處的版本為2.6.0,到原始碼的地方執行一下gradle idea,然後在idea裡面安裝一下scala的外掛。在開始匯入原始碼

  • 修改config下的server.properties的kafka日誌的位置和zookeeper的配置資訊。

  • 將log4j.properties移到到kafka-2.6.0-src\core\src\main\resources\log4j.properties

  • 修改build.gradle檔案,不然是沒有日誌的。如果是低版本的話,是不需要的,此處我用0.10.0.1版本測試過

    project(':core') {
      println "Building project 'core' with Scala version ${versions.scala}"
    
      apply plugin: 'scala'
      apply plugin: "org.scoverage"
      archivesBaseName = "kafka_${versions.baseScala}"
    
      dependencies {
        compile project(':clients')
        compile libs.jacksonDatabind
        compile libs.jacksonModuleScala
        compile libs.jacksonDataformatCsv
        compile libs.jacksonJDK8Datatypes
        compile libs.joptSimple
        compile libs.metrics
        compile libs.scalaCollectionCompat
        compile libs.scalaJava8Compat
        compile libs.scalaLibrary
        // only needed transitively, but set it explicitly to ensure it has the same version as scala-library
        compile libs.scalaReflect
        compile libs.scalaLogging
        compile libs.slf4jApi
        compile libs.slf4jlog4j
        compile libs.log4j
        compile(libs.zookeeper) {
    //      exclude module: 'slf4j-log4j12'
    //      exclude module: 'log4j'
        }
        // ZooKeeperMain depends on commons-cli but declares the dependency as `provided`
        compile libs.commonsCli
    
        compileOnly libs.log4j
    
        testCompile project(':clients').sourceSets.test.output
        testCompile libs.bcpkix
        testCompile libs.mockitoCore
        testCompile libs.easymock
        testCompile(libs.apacheda) {
          exclude group: 'xml-apis', module: 'xml-apis'
          // `mina-core` is a transitive dependency for `apacheds` and `apacheda`.
          // It is safer to use from `apacheds` since that is the implementation.
          exclude module: 'mina-core'
        }
        testCompile libs.apachedsCoreApi
        testCompile libs.apachedsInterceptorKerberos
        testCompile libs.apachedsProtocolShared
        testCompile libs.apachedsProtocolKerberos
        testCompile libs.apachedsProtocolLdap
        testCompile libs.apachedsLdifPartition
        testCompile libs.apachedsMavibotPartition
        testCompile libs.apachedsJdbmPartition
        testCompile libs.junit
        testCompile libs.scalatest
        testCompile libs.slf4jlog4j
        testCompile libs.jfreechart
      }
    
  • 修改啟動引數

  • 啟動

六、進行測試

生成者

public class ProducerFastStart {

    //kafka叢集地址
    private static final String brokerList = "localhost:9092";
    //主體名稱
    private static final String topic = "dalianpai";

    public static void main(String[] args) {
        Properties properties = new Properties();

        //設定序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //設定重試次數
        properties.put(ProducerConfig.RETRIES_CONFIG,10);

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        // 2 構建攔截鏈
        List<String> interceptors = new ArrayList<>();
        interceptors.add(CounterInterceptor.class.getName());
        interceptors.add(TimeInterceptor.class.getName());
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 3 傳送訊息
        for (int i = 0; i < 11; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!"+i);
            producer.send(record);
        }

        producer.close();
    }
}

消費者

public class ConsumerFastStart {

    // Kafka叢集地址
    private static final String brokerList = "localhost:9092";
    // 主題名稱-之前已經建立
    private static final String topic = "dalianpai";
    // 消費組
    private static final String groupId = "group.demo";

    public static void main(String[] args) {
        Properties properties = new Properties();

        //設定序列化器
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);

        properties.put("group.id", groupId);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

測試結果: