最全Flume、ElasticSearch、Kibana實現日誌實時展示
今天一天的時間,成功使用flume把日誌扇入ElasticSearch中,並執行Kibana實現日誌的初步展示,記錄於此。
1:ES叢集的搭建不予贅述,可參考:如何搭建ES叢集
2:Flume與ES協同
這一部分堪稱是重中之重,主要的時間就是花費在這上面了。
flume的sink裡,其實是有ElasticSearchSink的,我的打算,也是想直接使用其實現功能即可,後發現,ES使用的版本過高,但又不想照網上那樣說的恢復到以前的1.x版本,於是,自己想辦法把flume內的flume-ng-elasticsearch-sink的jar包修改後重新打了一份,成功執行起來。
廢話少說,進入正題。
JDK版本:1.8.0_111
Flume版本:1.6.0;很多人可能疑惑如何找到flume的版本,可以參照lib目錄下那些jar包,很容易看到自己的flume版本的。
ElasticSearch:5.6.2。
下面是詳細步驟:
1:flume-ng-elasticsearch-sink-1.6.0.jar
這個包,就在flume下的lib內,負責的就是ElasticSearchSink的功能,從官網上找到對應的flume版本,將原始碼全部下載下來。
我是從github上下載的,參照我的版本,找到了flume-1.6對應的branch,全部程式碼拷貝下來之後,作為maven工程匯入到開發工具,我用的是Eclipse。
匯入之後,目錄結構應該大致如此,修改build-path為1.8即可。
匯入完畢之後,不會主動報錯,但是我們必須按照自己的配置來,讓它報錯,從報錯的地方一點點去改。
首先,修改pom.xml檔案。
<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>flume-ng-sinks</artifactId> <groupId>org.apache.flume</groupId> <version>1.6.0</version> </parent> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-elasticsearch-sink</artifactId> <name>Flume NG ElasticSearch Sink</name> <build> <plugins> <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <!-- 這裡,我把原先的option改為了我需要的5.6.2版本 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>5.6.2</version><!--$NO-MVN-MAN-VER$ --> </dependency> <!-- 這個包是必須要新增的,不然也會報錯 --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>5.6.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> <!-- 這個包是為了完成自己的用途加的,用不到可以不加 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.44</version> </dependency> </dependencies> </project>
裡面我改動過的地方,都有註釋;因為我需要連線的ES是5.6.2版本的,所以把對應的牽涉到ES的版本,全部更換為了5.6.2。
2:修改之後,大片大片的開始報錯了,這裡,test報的錯不用管它,打包時候skip tests即可。
下面的步驟順序不是固定的,但是所有步驟進行下來,最終應該是全正確的。
1:ContentBuilderUtil類會報錯:不太記得改動的地方了,直接附上原始碼
* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* Utility methods for using ElasticSearch {@link XContentBuilder}
*/
public class ContentBuilderUtil {
private static final Charset charset = Charset.defaultCharset();
private ContentBuilderUtil() {
}
public static void appendField(XContentBuilder builder, String field,
byte[] data) throws IOException {
XContentType contentType = XContentFactory.xContentType(data);
if (contentType == null) {
addSimpleField(builder, field, data);
} else {
addComplexField(builder, field, contentType, data);
}
}
public static void addSimpleField(XContentBuilder builder,
String fieldName, byte[] data) throws IOException {
builder.field(fieldName, new String(data, charset));
}
public static void addComplexField(XContentBuilder builder,
String fieldName, XContentType contentType, byte[] data)
throws IOException {
XContentParser parser = XContentFactory.xContent(contentType)
.createParser(NamedXContentRegistry.EMPTY, data);
parser.nextToken();
// Add the field name, but not the value.
builder.field(fieldName);
try {
// This will add the whole parsed content as the value of the field.
builder.copyCurrentStructure(parser);
} catch (Exception ex) {
// If we get an exception here the most likely cause is nested JSON
// that
// can't be figured out in the body. At this point just push it
// through
// as is, we have already added the field so don't do it again
builder.endObject();
builder.field(fieldName, new String(data, charset));
} finally {
if (parser != null) {
parser.close();
}
}
}
}
2:ElasticSearchEventSerializer會報錯
* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch;
import java.io.IOException;
/**
* Interface for an event serializer which serializes the headers and body of an
* event to write them to ElasticSearch. This is configurable, so any config
* params required should be taken through this.
*/
public interface ElasticSearchEventSerializer extends Configurable,
ConfigurableComponent {
public static final Charset charset = Charset.defaultCharset();
/**
* Return an {@link BytesStream} made up of the serialized flume event
*
* @param event
* The flume event to serialize
* @return A {@link BytesStream} used to write to ElasticSearch
* @throws IOException
* If an error occurs during serialization
*/
abstract XContentBuilder getContentBuilder(Event event) throws IOException;
}
3:ElasticSearchLogStashEventSerializer
* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* Serialize flume events into the same format LogStash uses</p>
*
* This can be used to send events to ElasticSearch and use clients such as
* Kabana which expect Logstash formated indexes
*
* <pre>
* {
* "@timestamp": "2010-12-21T21:48:33.309258Z",
* "@tags": [ "array", "of", "tags" ],
* "@type": "string",
* "@source": "source of the event, usually a URL."
* "@source_host": ""
* "@source_path": ""
* "@fields":{
* # a set of fields for this event
* "user": "jordan",
* "command": "shutdown -r":
* }
* "@message": "the original plain-text message"
* }
* </pre>
*
* If the following headers are present, they will map to the above logstash
* output as long as the logstash fields are not already present.</p>
*
* <pre>
* timestamp: long -> @timestamp:Date
* host: String -> @source_host: String
* src_path: String -> @source_path: String
* type: String -> @type: String
* source: String -> @source: String
* </pre>
*
* @see https
* ://github.com/logstash/logstash/wiki/logstash%27s-internal-message-
* format
*/
public class ElasticSearchLogStashEventSerializer implements
ElasticSearchEventSerializer {
@Override
public XContentBuilder getContentBuilder(Event event) throws IOException {
XContentBuilder builder = jsonBuilder().startObject();
appendBody(builder, event);
appendHeaders(builder, event);
return builder;
}
private void appendBody(XContentBuilder builder, Event event)
throws IOException, UnsupportedEncodingException {
byte[] body = event.getBody();
ContentBuilderUtil.appendField(builder, "@message", body);
}
private void appendHeaders(XContentBuilder builder, Event event)
throws IOException {
Map<String, String> headers = new HashMap<String, String>(
event.getHeaders());
String timestamp = headers.get("timestamp");
if (!StringUtils.isBlank(timestamp)
&& StringUtils.isBlank(headers.get("@timestamp"))) {
long timestampMs = Long.parseLong(timestamp);
builder.field("@timestamp", new Date(timestampMs));
}
String source = headers.get("source");
if (!StringUtils.isBlank(source)
&& StringUtils.isBlank(headers.get("@source"))) {
ContentBuilderUtil.appendField(builder, "@source",
source.getBytes(charset));
}
String type = headers.get("type");
if (!StringUtils.isBlank(type)
&& StringUtils.isBlank(headers.get("@type"))) {
ContentBuilderUtil.appendField(builder, "@type",
type.getBytes(charset));
}
String host = headers.get("host");
if (!StringUtils.isBlank(host)
&& StringUtils.isBlank(headers.get("@source_host"))) {
ContentBuilderUtil.appendField(builder, "@source_host",
host.getBytes(charset));
}
String srcPath = headers.get("src_path");
if (!StringUtils.isBlank(srcPath)
&& StringUtils.isBlank(headers.get("@source_path"))) {
ContentBuilderUtil.appendField(builder, "@source_path",
srcPath.getBytes(charset));
}
builder.startObject("@fields");
for (String key : headers.keySet()) {
byte[] val = headers.get(key).getBytes(charset);
ContentBuilderUtil.appendField(builder, key, val);
}
builder.endObject();
}
@Override
public void configure(Context context) {
// NO-OP...
}
@Override
public void configure(ComponentConfiguration conf) {
// NO-OP...
}
}
4:ElasticSearchTransportClient
* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flume.sink.elasticsearch.client;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory;
import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
public class ElasticSearchTransportClient implements ElasticSearchClient {
public static final Logger logger = LoggerFactory
.getLogger(ElasticSearchTransportClient.class);
private InetSocketTransportAddress[] serverAddresses;
private ElasticSearchEventSerializer serializer;
private ElasticSearchIndexRequestBuilderFactory indexRequestBuilderFactory;
private BulkRequestBuilder bulkRequestBuilder;
private Client client;
@VisibleForTesting
InetSocketTransportAddress[] getServerAddresses() {
return serverAddresses;
}
@VisibleForTesting
void setBulkRequestBuilder(BulkRequestBuilder bulkRequestBuilder) {
this.bulkRequestBuilder = bulkRequestBuilder;
}
/**
* Transport client for external cluster
*
* @param hostNames
* @param clusterName
* @param serializer
*/
public ElasticSearchTransportClient(String[] hostNames, String clusterName,
ElasticSearchEventSerializer serializer) {
configureHostnames(hostNames);
this.serializer = serializer;
openClient(clusterName);
}
public ElasticSearchTransportClient(String[] hostNames, String clusterName,
ElasticSearchIndexRequestBuilderFactory indexBuilder) {
configureHostnames(hostNames);
this.indexRequestBuilderFactory = indexBuilder;
openClient(clusterName);
}
/**
* Local transport client only for testing
*
* @param indexBuilderFactory
*/
// public ElasticSearchTransportClient(
// ElasticSearchIndexRequestBuilderFactory indexBuilderFactory) {
// this.indexRequestBuilderFactory = indexBuilderFactory;
// openLocalDiscoveryClient();
// }
/**
* Local transport client only for testing
*
* @param serializer
*/
// public ElasticSearchTransportClient(ElasticSearchEventSerializer
// serializer) {
// this.serializer = serializer;
// openLocalDiscoveryClient();
// }
/**
* Used for testing
*
* @param client
* ElasticSearch Client
* @param serializer
* Event Serializer
*/
public ElasticSearchTransportClient(Client client,
ElasticSearchEventSerializer serializer) {
this.client = client;
this.serializer = serializer;
}
/**
* Used for testing
*
* @param client
* ElasticSearch Client
* @param serializer
* Event Serializer
*/
public ElasticSearchTransportClient(Client client,
ElasticSearchIndexRequestBuilderFactory requestBuilderFactory)
throws IOException {
this.client = client;
requestBuilderFactory.createIndexRequest(client, null, null, null);
}
private void configureHostnames(String[] hostNames) {
logger.warn(Arrays.toString(hostNames));
serverAddresses = new InetSocketTransportAddress[hostNames.length];
for (int i = 0; i < hostNames.length; i++) {
String[] hostPort = hostNames[i].trim().split(":");
String host = hostPort[0].trim();
int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1]
.trim()) : DEFAULT_PORT;
// 此處加以修改了
try {
serverAddresses[i] = new InetSocketTransportAddress(
InetAddress.getByName(host), port);
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
@Override
public void close() {
if (client != null) {
client.close();
}
client = null;
}
/**
*
* @description:將輸出的異常轉換為字串
* @author:yuzhao.yang
* @param:
* @return:
* @time:2017年6月7日 上午10:27:00
*/
public String transfer(Exception e) throws Exception {
// e.printStackTrace();
ByteArrayOutputStream buf = new ByteArrayOutputStream();
e.printStackTrace(new PrintWriter(buf, true));
String expMessage = buf.toString();
buf.close();
if (null != expMessage) {
return expMessage;
} else {
return null;
}
}
@Override
public void addEvent(Event event, IndexNameBuilder indexNameBuilder,
String indexType, long ttlMs) throws Exception {
if (bulkRequestBuilder == null) {
bulkRequestBuilder = client.prepareBulk();
}
IndexRequestBuilder indexRequestBuilder = null;
if (indexRequestBuilderFactory == null) {
Map<String, ?> map = null;
try {
String body = new String(event.getBody());
logger.error("資料結果:" + body);
map = (Map<String, ?>) JSON.parse(body);
} catch (Exception e) {
logger.error("getContentBuilder異常:" + transfer(e));
}
indexRequestBuilder = client.prepareIndex(
indexNameBuilder.getIndexName(event), indexType).setSource(
map);
} else {
indexRequestBuilder = indexRequestBuilderFactory
.createIndexRequest(client,
indexNameBuilder.getIndexPrefix(event), indexType,
event);
}
if (ttlMs > 0) {
indexRequestBuilder.setTTL(ttlMs);
}
bulkRequestBuilder.add(indexRequestBuilder);
}
@Override
public void execute() throws Exception {
try {
BulkResponse bulkResponse = bulkRequestBuilder.execute()
.actionGet();
if (bulkResponse.hasFailures()) {
throw new EventDeliveryException(
bulkResponse.buildFailureMessage());
}
} finally {
bulkRequestBuilder = client.prepareBulk();
}
}
/**
* Open client to elaticsearch cluster
*
* @param clusterName
*/
private void openClient(String clusterName) {
Settings settings = Settings.builder().put("cluster.name", clusterName)
.build();
// TransportClient transportClient = new TransportClient(settings);
// for (InetSocketTransportAddress host : serverAddresses) {
// transportClient.addTransportAddress(host);
// }
TransportClient transportClient = null;
for (InetSocketTransportAddress host : serverAddresses) {
if (null == transportClient) {
transportClient = new PreBuiltTransportClient(settings)
.addTransportAddress(host);
} else {
transportClient = transportClient.addTransportAddress(host);
}
}
if (client != null) {
client.close();
}
client = transportClient;
}
/*
* FOR TESTING ONLY...
*
* Opens a local discovery node for talking to an elasticsearch server
* running in the same JVM
*/
// private void openLocalDiscoveryClient() {
// logger.info("Using ElasticSearch AutoDiscovery mode");
// Node node = NodeBuilder.nodeBuilder().client(true).local(true).node();
// if (client != null) {
// client.close();
// }
// client = node.client();
// }
@Override
public void configure(Context context) {
// To change body of implemented methods use File | Settings | File
// Templates.
}
}
這個類,我主要是在addEvent內部做出了一點修改,因為直接使用XContentBuilder總是報錯,於是使用了map格式來進行資料轉換。
其他基本沒什麼需要修改得了。
可能有些修改不太記得了,這裡,我把修改後的程式碼放在了github上:修改後的程式碼地址
這裡需要注意:我的資料是map格式的,所以針對transportclient做出瞭如上圖的修改,其他資料格式不同的,可以思考一下自己的實現方式。
程式碼修改完成之後,直接打包,雖然pom.xml沒有打包外掛,但是按照預設的打包邏輯進行就可以了,會自動下載maven外掛進行打包的。
打包後體積很小,因為很多jar包並沒有打入進去,所以,接下來我們還要針對flume下的jar包進行修改和替換,這裡,直接附上我的flume下的所有jar包記錄,方便大家使用:
apache-log4j-extras-1.1.jar flume-ng-kafka-sink-1.6.0.jar kite-data-core-1.0.0.jar parquet-avro-1.4.1.jar
async-1.4.0.jar flume-ng-log4jappender-1.6.0.jar kite-data-hbase-1.0.0.jar parquet-column-1.4.1.jar
asynchbase-1.5.0.jar flume-ng-morphline-solr-sink-1.6.0.jar kite-data-hive-1.0.0.jar parquet-common-1.4.1.jar
avro-1.7.4.jar flume-ng-node-1.6.0.jar kite-hadoop-compatibility-1.0.0.jar parquet-encoding-1.4.1.jar
avro-ipc-1.7.4.jar flume-ng-sdk-1.6.0.jar lang-mustache-client-5.6.2.jar parquet-format-2.0.0.jar
commons-cli-1.2.jar flume-scribe-source-1.6.0.jar libthrift-0.9.0.jar parquet-generator-1.4.1.jar
commons-codec-1.8.jar flume-spillable-memory-channel-1.6.0.jar log4j-1.2.17.jar parquet-hadoop-1.4.1.jar
commons-collections-3.2.1.jar flume-thrift-source-1.6.0.jar log4j-api-2.9.1.jar parquet-hive-bundle-1.4.1.jar
commons-compress-1.4.1.jar flume-tools-1.6.0.jar lucene-analyzers-common-6.6.1.jar parquet-jackson-1.4.1.jar
commons-dbcp-1.4.jar flume-twitter-source-1.6.0.jar lucene-backward-codecs-6.6.1.jar percolator-client-5.6.2.jar
commons-io-2.1.jar gson-2.2.2.jar lucene-core-6.6.1.jar plugin-cli-5.6.2.jar
commons-jexl-2.1.1.jar guava-11.0.2.jar lucene-grouping-6.6.1.jar protobuf-java-2.5.0.jar
commons-lang-2.5.jar HdrHistogram-2.1.9.jar lucene-highlighter-6.6.1.jar reindex-client-5.6.2.jar
commons-logging-1.1.1.jar hppc-0.7.1.jar lucene-join-6.6.1.jar scala-library-2.10.1.jar
commons-pool-1.5.4.jar httpclient-4.2.1.jar lucene-memory-6.6.1.jar securesm-1.1.jar
curator-client-2.6.0.jar httpcore-4.1.3.jar lucene-misc-6.6.1.jar serializer-2.7.2.jar
curator-framework-2.6.0.jar irclib-1.10.jar lucene-queries-6.6.1.jar servlet-api-2.5-20110124.jar
curator-recipes-2.6.0.jar jackson-annotations-2.3.0.jar lucene-queryparser-6.6.1.jar slf4j-api-1.6.1.jar
derby-10.8.2.2.jar jackson-core-2.8.6.jar lucene-sandbox-6.6.1.jar slf4j-log4j12-1.6.1.jar
elasticsearch-5.6.2.jar jackson-core-asl-1.9.3.jar lucene-spatial3d-6.6.1.jar snakeyaml-1.15.jar
flume-avro-source-1.6.0.jar jackson-databind-2.3.1.jar lucene-spatial-6.6.1.jar snappy-java-1.1.0.jar
flume-dataset-sink-1.6.0.jar jackson-dataformat-cbor-2.8.6.jar lucene-spatial-extras-6.6.1.jar spatial4j-0.6.jar
flume-file-channel-1.6.0.jar jackson-dataformat-smile-2.8.6.jar lucene-suggest-6.6.1.jar t-digest-3.0.jar
flume-hdfs-sink-1.6.0.jar jackson-dataformat-yaml-2.8.6.jar mapdb-0.9.9.jar transport-5.6.2.jar
flume-hive-sink-1.6.0.jar jackson-mapper-asl-1.9.3.jar metrics-core-2.2.0.jar transport-netty3-client-5.6.2.jar
flume-irc-sink-1.6.0.jar java-version-checker-5.6.2.jar mina-core-2.0.4.jar transport-netty4-client-5.6.2.jar
flume-jdbc-channel-1.6.0.jar jetty-6.1.26.jar netty-3.5.12.Final.jar twitter4j-core-3.0.3.jar
flume-jms-source-1.6.0.jar jetty-util-6.1.26.jar netty-buffer-4.1.13.Final.jar twitter4j-media-support-3.0.3.jar
flume-kafka-channel-1.6.0.jar jna-4.4.0-1.jar netty-codec-4.1.13.Final.jar twitter4j-stream-3.0.3.jar
flume-kafka-source-1.6.0.jar joda-time-2.1.jar netty-common-4.1.13.Final.jar velocity-1.7.jar
flume-ng-auth-1.6.0.jar joda-time-2.9.5.jar netty-handler-4.1.13.Final.jar xalan-2.7.2.jar
flume-ng-configuration-1.6.0.jar jopt-simple-3.2.jar netty-resolver-4.1.13.Final.jar xercesImpl-2.9.1.jar
flume-ng-core-1.6.0.jar jopt-simple-5.0.2.jar netty-transport-4.1.13.Final.jar xml-apis-1.3.04.jar
flume-ng-elasticsearch-sink-1.6.0.jar jsr305-1.3.9.jar opencsv-2.3.jar xz-1.0.jar
flume-ng-embedded-agent-1.6.0.jar jts-1.13.jar paranamer-2.3.jar zkclient-0.3.jar
flume-ng-hbase-sink-1.6.0.jar kafka_2.10-0.8.1.1.jar parent-join-client-5.6.2.jar
我這裡首先把ElasticSearch下lib目錄內的所有jar包都放了過來,然後對於自己的專案,把那些不重複的jar包全部拋了過去,最終正常執行起來。
檢視日誌,發現數據的確成功輸送到了ES中,完美解決。
3:Kibana簡單使用
我這裡,使用的是:kibana-5.6.2-linux-x86_64,大家可以直接在ES官網中找到下載;解壓,修改配置檔案,這裡把配置檔案附上:
# Kibana is served by a back end server. This setting specifies the port to use.
server.port: 5601
# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "0.0.0.0"
# Enables you to specify a path to mount Kibana at if you are running behind a proxy. This only affects
# the URLs generated by Kibana, your proxy is expected to remove the basePath value before forwarding requests
# to Kibana. This setting cannot end in a slash.
#server.basePath: ""
# The maximum payload size in bytes for incoming server requests.
#server.maxPayloadBytes: 1048576
# The Kibana server's name. This is used for display purposes.
#server.name: "your-hostname"
# The URL of the Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://192.168.100.34:9200"
# When this setting's value is true Kibana uses the hostname specified in the server.host
# setting. When the value of this setting is false, Kibana uses the hostname of the host
# that connects to this Kibana instance.
#elasticsearch.preserveHost: true
# Kibana uses an index in Elasticsearch to store saved searches, visualizations and
# dashboards. Kibana creates a new index if the index doesn't already exist.
kibana.index: ".kibana"
# The default application to load.
#kibana.defaultAppId: "discover"
# If your Elasticsearch is protected with basic authentication, these settings provide
# the username and password that the Kibana server uses to perform maintenance on the Kibana
# index at startup. Your Kibana users still need to authenticate with Elasticsearch, which
# is proxied through the Kibana server.
#elasticsearch.username: "user"
#elasticsearch.password: "pass"
# Enables SSL and paths to the PEM-format SSL certificate and SSL key files, respectively.
# These settings enable SSL for outgoing requests from the Kibana server to the browser.
#server.ssl.enabled: false
#server.ssl.certificate: /path/to/your/server.crt
#server.ssl.key: /path/to/your/server.key
# Optional settings that provide the paths to the PEM-format SSL certificate and key files.
# These files validate that your Elasticsearch backend uses the same key files.
#elasticsearch.ssl.certificate: /path/to/your/client.crt
#elasticsearch.ssl.key: /path/to/your/client.key
# Optional setting that enables you to specify a path to the PEM file for the certificate
# authority for your Elasticsearch instance.
#elasticsearch.ssl.certificateAuthorities: [ "/path/to/your/CA.pem" ]
# To disregard the validity of SSL certificates, change this setting's value to 'none'.
#elasticsearch.ssl.verificationMode: full
# Time in milliseconds to wait for Elasticsearch to respond to pings. Defaults to the value of
# the elasticsearch.requestTimeout setting.
#elasticsearch.pingTimeout: 1500
# Time in milliseconds to wait for responses from the back end or Elasticsearch. This value
# must be a positive integer.
#elasticsearch.requestTimeout: 30000
# List of Kibana client-side headers to send to Elasticsearch. To send *no* client-side
# headers, set this value to [] (an empty list).
#elasticsearch.requestHeadersWhitelist: [ authorization ]
# Header names and values that are sent to Elasticsearch. Any custom headers cannot be overwritten
# by client-side headers, regardless of the elasticsearch.requestHeadersWhitelist configuration.
#elasticsearch.customHeaders: {}
# Time in milliseconds for Elasticsearch to wait for responses from shards. Set to 0 to disable.
#elasticsearch.shardTimeout: 0
# Time in milliseconds to wait for Elasticsearch at Kibana startup before retrying.
#elasticsearch.startupTimeout: 5000
# Specifies the path where Kibana creates the process ID file.
#pid.file: /var/run/kibana.pid
# Enables you specify a file where Kibana stores log output.
#logging.dest: stdout
# Set the value of this setting to true to suppress all logging output.
#logging.silent: false
# Set the value of this setting to true to suppress all logging output other than error messages.
#logging.quiet: false
# Set the value of this setting to true to log all events, including system usage information
# and all requests.
#logging.verbose: false
# Set the interval in milliseconds to sample system and process performance
# metrics. Minimum is 100ms. Defaults to 5000.
#ops.interval: 5000
# The default locale. This locale can be used in certain circumstances to substitute any missing
# translations.
#i18n.defaultLocale: "en"
主要是server.host和elasticsearch.url稍微修改了下,很容易看懂。
啟動,成功。
瀏覽器通過:hostIp:5601。
恭喜你,看到了Kibana的啟動介面,至於kibana的使用,那就是另一篇文章的事情了。