1. 程式人生 > >spark-kafka-es互動

spark-kafka-es互動

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.Seconds
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.SparkConf //import org.elasticsearch._ import com.alibaba.fastjson.JSONObject import com.alibaba.fastjson.JSON._ import java.text.SimpleDateFormat import org.elasticsearch.spark.rdd.EsSpark import org.apache.kafka.common.TopicPartition object stu_course_test { def tranTimeToLong(tm:String) :Long
={ val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val dt = fm.parse(tm) val aa = fm.format(dt) val tim: Long = dt.getTime()/1000 tim } def main(args:Array[String]){ val conf = new SparkConf().setAppName("stu_live_test5").set("es.nodes",ip).set("es.port","9200") val ssc
= new StreamingContext(conf, Seconds(2)) println("hello") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> ip, "group.id" -> "test_kafka1106", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "sasl.plain.username" -> usrname, "sasl.plain.password" -> psw, "security.protocol" -> "SASL_PLAINTEXT", "sasl.mechanism" -> "PLAIN" // "auto.offset.reset" -> "earliest", // "enable.auto.commit" -> (false: java.lang.Boolean) ); val tops = "topic_name" val topics = tops.split(",").toSet// set offset val fromOffsets = Map[TopicPartition, Long](new TopicPartition(tops,0) -> 20385338L).toMap val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)); println("****************************9999"); val lines = stream.map(record => record.value) val offs = stream.map(off => off.offset) offs.print() lines.print() lines.foreachRDD(record=>{ val datas = record.collect() val count = record.count() if (count>0){ for (i <- datas){ val dict = parseObject(parseObject(i).get("data").toString) val stu_data = new JSONObject() stu_data.put("a",dict.get("a").toString.toInt) stu_data.put("b",dict.get("b").toString.toInt) stu_data.put("c",dict.get("c").toString) stu_data.put("d",dict.get("d").toString.toInt) stu_data.put("time",tranTimeToLong(dict.get("time").toString).toInt) stu_data.put("e",dict.get("e").toString.toInt) val query = """{"query":{"bool":{"must":[{"term":{"key":"""+stu_data.get("keyid").toString+"""}},{"term":{"status":2}}]}}}""" println(query) val es_result = EsSpark.esRDD(ssc.sparkContext,"index_name/all-type",query) println(es_result) es_result.collect().foreach(course =>{ stu_data.put("aa",course._2("aa").toString) stu_data.put("bb",course._2("bb").toString) stu_data.put("cc",course._2("cc").toString.toInt) val _id = stu_data.get("aa").toString+"_"+stu_data.get("bb")+"_"+stu_data.get("cc").toString stu_data.put("_id",_id) val stu_data_js = stu_data.toString val rdd = ssc.sparkContext.makeRDD(Seq(stu_data_js)) EsSpark.saveJsonToEs(rdd,"test_index_name/docs",Map("es.mapping.id" -> "_id")) }) } } }) println("dfsdfsdf"); ssc.start(); ssc.awaitTermination(); } }