Elasticsearch 與 Kafka 整合剖析
1.概述
目前,隨著大數據的浪潮,Kafka 被越來越多的企業所認可,如今的Kafka已發展到0.10.x,其優秀的特性也帶給我們解決實際業務的方案。對於數據分流來說,既可以分流到離線存儲平臺(HDFS),離線計算平臺(Hive倉庫),也可以分流實時流水計算(Storm,Spark)等,同樣也可以分流到海量數據查詢(HBase),或是及時查詢(ElasticSearch)。而今天筆者給大家分享的就是Kafka 分流數據到 ElasticSearch。
2.內容
我們知道,ElasticSearch是有其自己的套件的,簡稱ELK,即ElasticSearch,Logstash以及Kibana。ElasticSearch負責存儲,Logstash負責收集數據來源,Kibana負責可視化數據,分工明確。想要分流Kafka中的消息數據,可以使用Logstash的插件直接消費,但是需要我們編寫復雜的過濾條件,和特殊的映射處理,比如系統保留的`_uid`字段等需要我們額外的轉化。今天我們使用另外一種方式來處理數據,使用Kafka的消費API和ES的存儲API來處理分流數據。通過編寫Kafka消費者,消費對應的業務數據,將消費的數據通過ES存儲API,通過創建對應的索引的,存儲到ES中。其流程如下圖所示:
上圖可知,消費收集的數據,通過ES提供的存儲接口進行存儲。存儲的數據,這裏我們可以規劃,做定時調度。最後,我們可以通過Kibana來可視化ES中的數據,對外提供業務調用接口,進行數據共享。
3.實現
下面,我們開始進行實現細節處理,這裏給大家提供實現的核心代碼部分,實現代碼如下所示:
3.1 定義ES格式
我們以插件的形式進行消費,從Kafka到ES的數據流向,只需要定義插件格式,如下所示:
{ "job": { "content": { "reader": { "name": "kafka","parameter": { "topic": "kafka_es_client_error", "groupid": "es2", "bootstrapServers": "k1:9094,k2:9094,k3:9094" }, "threads": 6 }, "writer": { "name": "es","parameter": { "host": [ "es1:9300,es2:9300,es3:9300" ], "index": "client_error_%s", "type": "client_error" } } } } }
這裏處理消費存儲的方式,將讀和寫的源分開,配置各自屬性即可。
3.2 數據存儲
這裏,我們通過每天建立索引進行存儲,便於業務查詢,實現細節如下所示:
public class EsProducer { private final static Logger LOG = LoggerFactory.getLogger(EsProducer.class); private final KafkaConsumer<String, String> consumer; private ExecutorService executorService; private Configuration conf = null; private static int counter = 0; public EsProducer() { String root = System.getProperty("user.dir") + "/conf/"; String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path"); conf = Configuration.from(new File(root + path)); Properties props = new Properties(); props.put("bootstrap.servers", conf.getString("job.content.reader.parameter.bootstrapServers")); props.put("group.id", conf.getString("job.content.reader.parameter.groupid")); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(conf.getString("job.content.reader.parameter.topic"))); } public void execute() { executorService = Executors.newFixedThreadPool(conf.getInt("job.content.reader.threads")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); if (null != records) { executorService.submit(new KafkaConsumerThread(records, consumer)); } } } public void shutdown() { try { if (consumer != null) { consumer.close(); } if (executorService != null) { executorService.shutdown(); } if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { LOG.error("Shutdown kafka consumer thread timeout."); } } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } } class KafkaConsumerThread implements Runnable { private ConsumerRecords<String, String> records; public KafkaConsumerThread(ConsumerRecords<String, String> records, KafkaConsumer<String, String> consumer) { this.records = records; } @Override public void run() { String index = conf.getString("job.content.writer.parameter.index"); String type = conf.getString("job.content.writer.parameter.type"); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { JSONObject json = JSON.parseObject(record.value()); List<Map<String, Object>> list = new ArrayList<>(); Map<String, Object> map = new HashMap<>(); index = String.format(index, CalendarUtils.timeSpan2EsDay(json.getLongValue("_tm") * 1000L)); if (counter < 10) { LOG.info("Index : " + index); counter++; } for (String key : json.keySet()) { if ("_uid".equals(key)) { map.put("uid", json.get(key)); } else { map.put(key, json.get(key)); } list.add(map); } EsUtils.write2Es(index, type, list); } } } } }
這裏消費的數據源就處理好了,接下來,開始ES的存儲,實現代碼如下所示:
public class EsUtils { private static TransportClient client = null; static { if (client == null) { client = new PreBuiltTransportClient(Settings.EMPTY); } String root = System.getProperty("user.dir") + "/conf/"; String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path"); Configuration conf = Configuration.from(new File(root + path)); List<Object> hosts = conf.getList("job.content.writer.parameter.host"); for (Object object : hosts) { try { client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(object.toString().split(":")[0]), Integer.parseInt(object.toString().split(":")[1]))); } catch (Exception e) { e.printStackTrace(); } } } public static void write2Es(String index, String type, List<Map<String, Object>> dataSets) { BulkRequestBuilder bulkRequest = client.prepareBulk(); for (Map<String, Object> dataSet : dataSets) { bulkRequest.add(client.prepareIndex(index, type).setSource(dataSet)); } bulkRequest.execute().actionGet(); // if (client != null) { // client.close(); // } } public static void close() { if (client != null) { client.close(); } } }
這裏,我們利用BulkRequestBuilder進行批量寫入,減少頻繁寫入率。
4.調度
存儲在ES中的數據,如果不需要長期存儲,比如:我們只需要存儲及時查詢數據一個月,對於一個月以前的數據需要清除掉。這裏,我們可以編寫腳本直接使用Crontab來進行簡單調用即可,腳本如下所示:
#!/bin/sh # <Usage>: ./delete_es_by_day.sh kafka_error_client logsdate 30 </Usage>
echo "<Usage>: ./delete_es_by_day.sh kafka_error_client logsdate 30 </Usage>"
index_name=$1 daycolumn=$2 savedays=$3 format_day=$4 if [ ! -n "$savedays" ]; then echo "Oops. The args is not right,please input again...." exit 1 fi if [ ! -n "$format_day" ]; then format_day=‘%Y%m%d‘ fi sevendayago=`date -d "-${savedays} day " +${format_day}` curl -XDELETE "es1:9200/${index_name}/_query?pretty" -d " { "query": { "filtered": { "filter": { "bool": { "must": { "range": { "${daycolumn}": { "from": null, "to": ${sevendayago}, "include_lower": true, "include_upper": true } } } } } } } }" echo "Finished."
然後,在Crontab中進行定時調度即可。
5.總結
這裏,我們在進行數據寫入ES的時候,需要註意,有些字段是ES保留字段,比如`_uid`,這裏我們需要轉化,不然寫到ES的時候,會引發沖突導致異常,最終寫入失敗。
6.結束語
這篇博客就和大家分享到這裏,如果大家在研究學習的過程當中有什麽問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉
Elasticsearch 與 Kafka 整合剖析