kafka本地測試環境搭建
阿新 • • 發佈:2019-02-19
需求
由於共有云的kafka叢集只對測試機(阡陌機器等)開放,本地是無法訪問的,所以為了開發方便搭建一套kafka的測試環境是有必要的
軟體
- kafka_2.11-0.10.0.1
步驟
根據開發環境建立好相配置檔案,開發啟動各個元件
本地zk啟動
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
啟動broker節點
JMX_PORT=9997 bin/kafka-server-start.sh config/server-1.properties &
JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &
JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &
建立topic(如果已經存在就無需建立)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test
檢視當前topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
啟動生產者
bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic doctorq
啟動消費者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic doctorq
演示
模擬kafka傳送日誌資訊
因為huatuo接收到的資料並非簡單的String格式,而是複雜的二進位制格式,所以我們要在本地能夠模擬出這樣的資料
序列化的格式
{
"type": "record",
"name": "Event","namespace":"com.iwaimai.huatuo.log",
"fields": [{
"name": "headers" ,
"type": {
"type": "map",
"values": "string"
}
}, {
"name": "body",
"type": "string"
}]
}
程式碼
package com.iwaimai.huatuo.kafka;
import junit.framework.TestCase;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
/**
* Created by doctorq on 16/9/22.
*/
public class KafkaProducer extends TestCase {
private byte[] serializedBytes;
private GenericRecord payload;
private DatumWriter<GenericRecord> writer;
private BinaryEncoder encoder;
private ByteArrayOutputStream out;
private Producer<String, byte[]> producer;
public void setUp() throws Exception {
// 設定配置屬性
Properties props = new Properties();
props.put("metadata.broker.list","localhost:9092,localhost:9093,localhost:9094");
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
// key.serializer.class預設為serializer.class
// 觸發acknowledgement機制,否則是fire and forget,可能會引起資料丟失
// 值為0,1,-1,可以參考
// http://kafka.apache.org/08/configuration.html
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, byte[]>(config);
Schema schema = new Schema.Parser().parse(new File("src/test/resources/test-schema.avsc"));
payload = new GenericData.Record(schema);
writer = new SpecificDatumWriter<GenericRecord>(schema);
out = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder(out, null);
}
/**
* 模擬傳送Nginx日誌
* @throws Exception
*/
@Test
public void testNginxProducer() throws Exception {
Map headers = new LinkedHashMap<String, String>();
headers.put("timestamp", "2016-9-22 16:8:26");
headers.put("module", "NGINX");
headers.put("filename", "access_log.2016092216");
headers.put("topic", "pids-0000000038");
headers.put("host", "10.194.216.46");
headers.put("lineoffset", "33653");
headers.put("inode", "46274724");
payload.put("headers", headers);
payload.put("body", "10.194.219.31 - - [22/Sep/2016:16:08:26 +0800] \"POST /marketing/getshopactivity HTTP/1.1\" 200 518 \"-\" \"-\" \"RAL/2.0.8.6 (internal request)\" 0.021 506573372 - 10.194.217.47 unix:/home/map/odp_cater/var/php-cgi.sock 10.194.217.47 \"-\" waimai waimai 5065733720802800138092216 1474531706.602 0.021 - 10.194.219.31 logid=506573370 spanid=0.8 force_sampling=- status=200 host=10.194.217.47 server_addr=10.194.217.47 server_port=8086 client_addr=10.194.219.31 request=\"POST /marketing/getshopactivity HTTP/1.1\" msec=1474531706.602 request_time=0.021 content_tracing=-");
System.out.println("Original Message : "+ payload);
}
/**
* 模擬傳送RAL日誌
* @throws Exception
*/
@Test
public void testRalProducer() throws Exception {
Map headers = new LinkedHashMap<String, String>();
headers.put("timestamp", "2016-9-23 10:53:14");
headers.put("module", "RAL");
headers.put("filename", "ral-worker.log.2016092310");
headers.put("topic", "pids-0000000043");
headers.put("host", "10.195.181.17");
headers.put("lineoffset", "2557660");
headers.put("inode", "144277667");
payload.put("headers", headers);
payload.put("body", "NOTICE: 09-23 10:53:14: ral-worker * 4153 [php_ral.cpp:1437][logid=3194356520 worker_id=10488 optime=1474599194.518921 product=waimai subsys=waimai module=marketing user_ip= local_ip=10.195.181.17 local_port=8086 msg=ral_write_log log_type=E_SUM caller=redis_c_marketing from=/home/map/odp_cater/php/phplib/wm/service/RedisBns.php:131 spanid=0.9.26 method=get conv=redis prot=redis retry=0%2F1 remote_ip=10.194.218.13%3A7490 idc=nj cost=0.181 talk=0.181 write=0 connect=0 read=0.181 req_start_time=1474599194.5186 err_no=0 err_info=OK req_data=a%3A1%3A%7Bi%3A0%3Bs%3A30%3A%22wm%3Amkt%3Abgt%3Afuse%3A201609%3A195%3A0%3A2%22%3B%7D]");
System.out.println("Original Message : "+ payload);
}
public void tearDown() throws Exception {
writer.write(payload, encoder);
encoder.flush();
out.close();
serializedBytes = out.toByteArray();
KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("doctorq", serializedBytes);
producer.send(message);
producer.close();
}
}