1. 程式人生 > >kafka本地測試環境搭建

kafka本地測試環境搭建

需求

由於共有云的kafka叢集只對測試機(阡陌機器等)開放,本地是無法訪問的,所以為了開發方便搭建一套kafka的測試環境是有必要的

軟體

  • kafka_2.11-0.10.0.1

步驟

根據開發環境建立好相配置檔案,開發啟動各個元件

本地zk啟動

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

啟動broker節點

JMX_PORT=9997 bin/kafka-server-start.sh config/server-1.properties &
JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &
JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &

建立topic(如果已經存在就無需建立)

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test

檢視當前topic列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

啟動生產者

bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic doctorq

啟動消費者

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic doctorq

演示

模擬kafka傳送日誌資訊

因為huatuo接收到的資料並非簡單的String格式,而是複雜的二進位制格式,所以我們要在本地能夠模擬出這樣的資料

序列化的格式

{
    "type": "record",
    "name": "Event","namespace":"com.iwaimai.huatuo.log",
    "fields": [{
   "name": "headers"
, "type": { "type": "map", "values": "string" } }, { "name": "body", "type": "string" }]
}

程式碼

package com.iwaimai.huatuo.kafka;

import junit.framework.TestCase;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;

/**
 * Created by doctorq on 16/9/22.
 */
public class KafkaProducer extends TestCase {
    private byte[] serializedBytes;
    private GenericRecord payload;
    private DatumWriter<GenericRecord> writer;
    private BinaryEncoder encoder;
    private ByteArrayOutputStream out;
    private Producer<String, byte[]> producer;

    public void setUp() throws Exception {
        // 設定配置屬性
        Properties props = new Properties();
        props.put("metadata.broker.list","localhost:9092,localhost:9093,localhost:9094");
        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        // key.serializer.class預設為serializer.class
        // 觸發acknowledgement機制,否則是fire and forget,可能會引起資料丟失
        // 值為0,1,-1,可以參考
        // http://kafka.apache.org/08/configuration.html
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        producer = new Producer<String, byte[]>(config);
        Schema schema = new Schema.Parser().parse(new File("src/test/resources/test-schema.avsc"));
        payload = new GenericData.Record(schema);
        writer = new SpecificDatumWriter<GenericRecord>(schema);
        out = new ByteArrayOutputStream();
        encoder = EncoderFactory.get().binaryEncoder(out, null);
    }

    /**
     * 模擬傳送Nginx日誌
     * @throws Exception
     */
    @Test
    public void testNginxProducer() throws Exception {
        Map headers = new LinkedHashMap<String, String>();
        headers.put("timestamp", "2016-9-22 16:8:26");
        headers.put("module", "NGINX");
        headers.put("filename", "access_log.2016092216");
        headers.put("topic", "pids-0000000038");
        headers.put("host", "10.194.216.46");
        headers.put("lineoffset", "33653");
        headers.put("inode", "46274724");
        payload.put("headers", headers);
        payload.put("body", "10.194.219.31 - - [22/Sep/2016:16:08:26 +0800] \"POST /marketing/getshopactivity HTTP/1.1\" 200 518 \"-\" \"-\" \"RAL/2.0.8.6 (internal request)\" 0.021 506573372 - 10.194.217.47 unix:/home/map/odp_cater/var/php-cgi.sock 10.194.217.47 \"-\" waimai waimai 5065733720802800138092216 1474531706.602 0.021 - 10.194.219.31 logid=506573370 spanid=0.8 force_sampling=- status=200 host=10.194.217.47 server_addr=10.194.217.47 server_port=8086 client_addr=10.194.219.31 request=\"POST /marketing/getshopactivity HTTP/1.1\" msec=1474531706.602 request_time=0.021 content_tracing=-");
        System.out.println("Original Message : "+ payload);
    }

    /**
     * 模擬傳送RAL日誌
     * @throws Exception
     */
    @Test
    public void testRalProducer() throws Exception {
        Map headers = new LinkedHashMap<String, String>();
        headers.put("timestamp", "2016-9-23 10:53:14");
        headers.put("module", "RAL");
        headers.put("filename", "ral-worker.log.2016092310");
        headers.put("topic", "pids-0000000043");
        headers.put("host", "10.195.181.17");
        headers.put("lineoffset", "2557660");
        headers.put("inode", "144277667");

        payload.put("headers", headers);
        payload.put("body", "NOTICE: 09-23 10:53:14: ral-worker * 4153 [php_ral.cpp:1437][logid=3194356520 worker_id=10488 optime=1474599194.518921 product=waimai subsys=waimai module=marketing user_ip= local_ip=10.195.181.17 local_port=8086 msg=ral_write_log log_type=E_SUM caller=redis_c_marketing from=/home/map/odp_cater/php/phplib/wm/service/RedisBns.php:131 spanid=0.9.26 method=get conv=redis prot=redis retry=0%2F1 remote_ip=10.194.218.13%3A7490 idc=nj cost=0.181 talk=0.181 write=0 connect=0 read=0.181 req_start_time=1474599194.5186 err_no=0 err_info=OK req_data=a%3A1%3A%7Bi%3A0%3Bs%3A30%3A%22wm%3Amkt%3Abgt%3Afuse%3A201609%3A195%3A0%3A2%22%3B%7D]");
        System.out.println("Original Message : "+ payload);
    }

    public void tearDown() throws Exception {
        writer.write(payload, encoder);
        encoder.flush();
        out.close();
        serializedBytes = out.toByteArray();
        KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("doctorq", serializedBytes);
        producer.send(message);
        producer.close();
    }

}

演示傳送