spark+kafka+Elasticsearch單機環境的部署和效能測試
阿新 • • 發佈:2019-02-17
版本選型
spark 1.5.2 + kafka 0.9.0.1 + Elasticsearch 2.2.1
安裝部署
1. 安裝指令碼及檔案 密碼 4m7l
2. 指令碼使用
- vi /etc/hosts
新增 127.0.0.1 hostname- cd npminstall
install.sh
#!/bin/sh
DIRNAME=`dirname "$0"`
localhome=`cd "$DIRNAME"; pwd`
upperhome=`cd "$DIRNAME/.."; pwd`
export username=root
export installpath=/data/mdware
export hadoopFileName=hadoop-2.4.1
export hadoopPackageName=hadoop-2.4.1.tar.gz
export sparkFileName=spark-1.5.2-bin-hadoop2.4
export sparkPackageName=spark-1.5.2-bin-hadoop2.4.tgz
export kafkaFileName=kafka_2.10-0.9.0.1
export kafkaPackageName=kafka_2.10-0.9.0.1.tgz
export elasticFileName=elasticsearch-2.2.1
export elasticPackageName=elasticsearch-2.2.1.tar.gz
export kibanaFileName=kibana-4.4.2-linux-x64
export kibanaPackageName=kibana-4.4.2-linux-x64.tar.gz
export installServices="hadoop
spark
kafka
elastic
kibana"
mkdir -p $installpath
for com in $installServices ; do
if [ $com"" == "hadoop" ] ; then
cp $localhome/files/$hadoopPackageName $installpath
cd $installpath && tar -zxf $hadoopPackageName
\cp -r $localhome/conf/hadoop/* $installpath/$hadoopFileName/etc/hadoop/
sh $installpath/$hadoopFileName/bin/hdfs namenode -format
rm -rf $installpath/$hadoopPackageName
ln -s $installpath/$hadoopFileName/ $installpath/hadoop
fi
if [ $com"" == "spark" ] ; then
cp $localhome/files/$sparkPackageName $installpath
cd $installpath && tar -zxf $sparkPackageName
\cp -r $localhome/conf/spark-env.sh $installpath/$sparkFileName/conf/
rm -rf $installpath/$sparkPackageName
ln -s $installpath/$sparkFileName/ $installpath/spark
fi
if [ $com"" == "kafka" ] ; then
cp $localhome/files/$kafkaPackageName $installpath
cd $installpath && tar -zxf $kafkaPackageName
\cp $localhome/conf/server.properties $installpath/$kafkaFileName/config/
rm -rf $installpath/$kafkaPackageName
ln -s $installpath/$kafkaFileName/ $installpath/kafka
fi
if [ $com"" == "elastic" ] ; then
cp $localhome/files/$elasticPackageName $installpath
cd $installpath && tar -zxf $elasticPackageName
\cp $localhome/conf/elasticsearch.yml $installpath/$elasticFileName/config/
rm -rf $installpath/$elasticPackageName
ln -s $installpath/$elasticFileName/ $installpath/es
$installpath/es/bin/plugin install mobz/elasticsearch-head/2.2.1
$installpath/es/bin/plugin install lmenezes/elasticsearch-kopf/2.2.1
fi
if [ $com"" == "kibana" ] ; then
cp $localhome/files/$kibanaPackageName $installpath
cd $installpath && tar -zxf $kibanaPackageName
rm -rf $installpath/$kibanaPackageName
ln -s $installpath/$kibanaFileName/ $installpath/kibana
fi
done
chmod +x $localhome/manage.sh
cp $localhome/manage.sh /etc/init.d/npm
chkconfig npm on
chmod +x install.sh
./install.sh
3. 啟動程序
service npm start
npm服務
#!/bin/bash
# chkconfig: 2345 20 81
# description: start and stop npm service
# processname: npm
. /etc/rc.d/init.d/functions
prog="npm"
DIRNAME=`dirname "$0"`
localhome=`cd "$DIRNAME"; pwd`
menSize=`free -g | awk 'NR==2{print $2}'`
men_size=`expr ${menSize} + 1`
heap_size=`expr ${men_size} / 4`
export installpath=/data/mdware
start(){
ulimit -n 655360
sh $installpath/hadoop/sbin/hadoop-daemon.sh start namenode
sh $installpath/hadoop/sbin/hadoop-daemon.sh start datanode
$installpath/hadoop/bin/hdfs dfsadmin -safemode leave
sh $installpath/spark/sbin/start-master.sh
sh $installpath/spark/sbin/start-slave.sh spark://localhost:7077
nohup $installpath/kafka/bin/zookeeper-server-start.sh $installpath/kafka/config/zookeeper.properties >> $installpath/kafka/zookeeper.log &
sleep 60
nohup $installpath/kafka/bin/kafka-server-start.sh $installpath/kafka/config/server.properties >> $installpath/kafka/kafka.log &
export ES_HEAP_SIZE=${heap_size}g
$installpath/es/bin/elasticsearch -Des.insecure.allow.root=true -d
}
stop(){
sh $installpath/hadoop/sbin/hadoop-daemon.sh stop namenode
sh $installpath/hadoop/sbin/hadoop-daemon.sh stop datanode
sh $installpath/spark/sbin/stop-master.sh
sh $installpath/spark/sbin/stop-slave.sh
zookeeper_id=`ps -ef | grep -i zookeeper.properties | grep -v grep | awk '{print $2}'`
if [[ -z $zookeeper_id ]];then
echo "The task is not running ! "
else
kill ${zookeeper_id}
fi
kafka_id=`ps -ef | grep -i server.properties | grep -v grep | awk '{print $2}'`
if [[ -z $kafka_id ]];then
echo "The task is not running ! "
else
kill ${kafka_id}
fi
es_id=`ps -ef|grep -i elasticsearch | grep -v "grep"|awk '{print $2}'`
if [[ -z $es_id ]];then
echo "The task is not running ! "
else
kill ${es_id}
fi
sleep 20
if [[ -z $zookeeper_id ]];then
echo "The task is not running ! "
else
kill -9 ${zookeeper_id}
fi
kafka_id=`ps -ef | grep -i server.properties | grep -v grep | awk '{print $2}'`
if [[ -z $kafka_id ]];then
echo "The task is not running ! "
else
kill -9 ${kafka_id}
fi
es_id=`ps -ef|grep -i elasticsearch | grep -v "grep"|awk '{print $2}'`
if [[ -z $es_id ]];then
echo "The task is not running ! "
else
kill -9 ${es_id}
fi
}
case "$1" in
start)
start
;;
stop)
stop
;;
*)
echo $"Usage: $0 {start|stop}"
exit 2
esac
exit $?
注:程序已設為開機自啟動
測試程式碼
public class KafkaDataProducer implements Runnable{
private static Logger log = Logger.getLogger(KafkaDataProducer.class);
private static Producer<String, String> producer;
private String topic;
private String path;
public KafkaDataProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", Config.getConfig("database.cnf").getProperty("bootstrap.server"));
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
public KafkaDataProducer(String topic, String path) {
this.path = path;
this.topic = topic;
Properties props = new Properties();
props.put("bootstrap.servers", Config.getConfig("database.cnf").getProperty("bootstrap.server"));
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
public static void main(String[] args) throws Exception {
KafkaDataProducer kafkaDataProducer1 = new KafkaDataProducer("test","datafile");
new Thread(kafkaDataProducer1).start();
// KafkaDataProducer kafkaDataProducer2 = new KafkaDataProducer("tcptest","tcp.file");
// new Thread(kafkaDataProducer2).start();
//
// KafkaDataProducer kafkaDataProducer3 = new KafkaDataProducer("httptest","http.file");
// new Thread(kafkaDataProducer3).start();
}
@Override
public void run() {
BufferedReader br = null;
try {
while ( true ) {
br = new BufferedReader(new FileReader(Config.getConfig("database.cnf").getProperty(path)));
String line;
while ((line = br.readLine()) != null) {
if (!"".equals(line.trim())) {
producer.send(new ProducerRecord<>(topic, "", line));
}
}
Thread.sleep(Long.valueOf(Config.getConfig("database.cnf").getProperty("sleep.time")));
}
} catch (Exception e) {
log.error("The read streaming error: ", e);
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
log.warn("close the read streaming error: ", e);
}
}
}
}
}
public class SSDPerformanceTest extends Analysis {
public static final Logger LOG = LoggerFactory.getLogger(SSDPerformanceTest.class);
protected static final Pattern TAB = Pattern.compile("\t");
private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm", Locale.CHINA);
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd", Locale.CHINA);
public static void main(String[] args) throws IOException {
String configfile = "database.cnf";
Properties config = Config.getConfig(configfile);
JavaPairReceiverInputDStream<String, byte[]> rawStream = setupRawStreamFromKafka(
config, config.getProperty("group.id"));
LOG.info("database config:" + config.toString());
rawStream.foreachRDD(new Function<JavaPairRDD<String, byte[]>, Void>() {
@Override
public Void call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception {
JavaRDD<Map<String, ?>> es = stringJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, byte[]>, DBKey, DBData>() {
public Tuple2<DBKey, DBData> call(Tuple2<String, byte[]> stringTuple2) throws Exception {
String[] database = TAB.split(new String(stringTuple2._2));
DBKey dbKey = new DBKey();
DBData dbData = new DBData();
String sqlString = new String(Base64.decodeBase64(database[10].trim()));
String storageSql;
if(sqlString.length() > 1000){
storageSql = sqlString.substring(0,1000);
}else{
storageSql = sqlString;
}
//DBKey
dbKey.setProbeName(database[0].trim());
dbKey.setCustomService(database[1].trim());
dbKey.setIpClient(database[2].trim());
dbKey.setIpServer(database[3].trim());
dbKey.setPortServer(database[5].trim());
dbKey.setTimeStart(format.format(new Date().getTime()));
dbKey.setOperateType(storageSql.split(" ")[0]); //Select, Insert, Update, Drop, Procedure
dbKey.setDbType(database[8].trim());
dbKey.setResponseCode(database[9].trim());
dbKey.setUser(database[2].trim());
dbKey.setSqlString(storageSql);
if(!database[12].trim().equals("-")) {
dbData.setOperateTime(Double.parseDouble(database[12].trim()));
}else if(!database[7].trim().equals("-")){
dbData.setOperateTime(Double.parseDouble(database[7].trim()) - Double.parseDouble(database[6].trim()));
}else{
dbData.setOperateTime(0);
}
if(!database[13].trim().equals("-")) {
dbData.setReqTransTime(Double.parseDouble(database[13].trim()));
}else{
dbData.setReqTransTime(0);
}
if(!database[14].trim().equals("-")) {
dbData.setRespTransTime(Double.parseDouble(database[14].trim()));
}else{
dbData.setRespTransTime(0);
}
if(!database[15].trim().equals("-")) {
dbData.setRespPayload(Integer.parseInt(database[15].trim()));
}else{
dbData.setRespPayload(0);
}
dbData.setCount(1);
dbData.setSlowCount(1);
return new Tuple2<>(dbKey,dbData);
}
}).filter(new Function<Tuple2<DBKey, DBData>, Boolean>() {
@Override
public Boolean call(Tuple2<DBKey, DBData> v1) throws Exception {
return v1 != null;
}
}).reduceByKey(new Function2<DBData, DBData, DBData>() {
public DBData call(DBData v1, DBData v2) throws Exception {
DBData result = new DBData();
result.setOperateTime(v1.getOperateTime() + v2.getOperateTime());
result.setReqTransTime(v1.getReqTransTime() + v1.getReqTransTime());
result.setRespTransTime(v1.getRespTransTime() + v2.getRespTransTime());
result.setRespPayload(v1.getRespPayload() + v2.getRespPayload());
result.setCount(v1.getCount() + v2.getCount());
result.setSlowCount(v1.getSlowCount() + v1.getSlowCount());
return result;
}
}).map(new Function<Tuple2<DBKey,DBData>, Map<String, ?>>() {
public Map<String, ?> call(Tuple2<DBKey, DBData> v1) throws Exception {
DBKey dbKey = v1._1;
DBData dbData = v1._2;
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
builder.put("index_name", sdf.format(format.parse(dbKey.getTimeStart())));
builder.put("probeName",dbKey.getProbeName());
builder.put("customService",dbKey.getCustomService());
builder.put("ipClient",dbKey.getIpClient());
builder.put("ipServer",dbKey.getIpServer());
builder.put("portServer",dbKey.getPortServer());
builder.put("operateType",dbKey.getOperateType());
builder.put("timeStart",format.parse(dbKey.getTimeStart()));
builder.put("dbType",dbKey.getDbType());
builder.put("user",dbKey.getUser());
builder.put("responseCode",dbKey.getResponseCode());
builder.put("sqlString",dbKey.getSqlString());
builder.put("operateTime",dbData.getOperateTime());
builder.put("reqTransTime",dbData.getReqTransTime());
builder.put("respTransTime",dbData.getRespTransTime());
builder.put("respPayload",dbData.getRespPayload());
builder.put("count",dbData.getCount());
builder.put("slowCount",dbData.getSlowCount());
return builder.build();
}
}).cache();
if (es != null) {
JavaEsSpark.saveToEs(es, "ni-database-{index_name}/database", ImmutableMap.of
(ConfigurationOptions.ES_MAPPING_EXCLUDE, "index_name"));
}
return null;
}
});
rawStream.context().start();
rawStream.context().awaitTermination();
}
}
測試環境
測試環境一 虛擬機器環境(8G記憶體 2核 非ssd)
- 分鐘寫入資料量
- 分鐘寫入事件數
測試環境二 虛擬機器環境(8G記憶體 2核 ssd)
- 分鐘寫入資料量
- 分鐘寫入事件數
測試環境三 IBM伺服器(126G記憶體 16核 非ssd)
- 分鐘寫入資料量
- 分鐘寫入事件數
測試環境四 IBM伺服器(126G記憶體 16核 ssd約160G)
任務資源分配 2G 2core
- 分鐘寫入資料量
忘了記錄- 分鐘寫入事件數
- 單獨寫入database資料
- database和tcp資料一起寫入
database